Skip to content
This repository has been archived by the owner on Jan 27, 2021. It is now read-only.

Prometheus metrics scraper #54

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ The following table lists the supported annotations for Kubernetes `Deployments`
| `osiris.deislabs.io/enabled` | Enable the zeroscaler component to scrape and analyze metrics from the deployment's pods and scale the deployment to zero when idle. Allowed values: `y`, `yes`, `true`, `on`, `1`. | _no value_ (= disabled) |
| `osiris.deislabs.io/minReplicas` | The minimum number of replicas to set on the deployment when Osiris will scale up. If you set `2`, Osiris will scale the deployment from `0` to `2` replicas directly. Osiris won't collect metrics from deployments which have more than `minReplicas` replicas - to avoid useless collections of metrics. | `1` |
| `osiris.deislabs.io/metricsCheckInterval` | The interval in which Osiris would repeatedly track the pod http request metrics. The value is the number of seconds of the interval. Note that this value override the global value defined by the `zeroscaler.metricsCheckInterval` Helm value. | _value of the `zeroscaler.metricsCheckInterval` Helm value_ |
| `osiris.deislabs.io/metricsCollector` | Configure the collection of metrics for a deployment's pods. The value is a JSON object with at least a `type` string, and an optional `implementation` object. See the *Metrics Scraping* section for more. | `{ "type": "osiris" }` |

#### Pod Annotations

Expand All @@ -212,6 +213,49 @@ The following table lists the supported annotations for Kubernetes `Services` an

Note that you might see an `osiris.deislabs.io/selector` annotation - this is for internal use only, and you shouldn't try to set/update or delete it.

#### Metrics Scraping Configuration

Scraping the metrics from the pods is done automatically using Osiris provided sidecar container by default. But if you don't want to use the auto-injected sidecar container, you can also configure a custom metrics scraper, using the `osiris.deislabs.io/metricsCollector` annotation on your deployment.

The following scrapers are supported:

**osiris**

This is the default scraper, which doesn't need any configuration.

**prometheus**

The prometheus scraper retrieves metrics about the opened & closed connections from your own prometheus endpoint. To use it, your application need to expose an endpoint with metrics in the prometheus format.
You can then set the following annotation:

```
annotations:
osiris.deislabs.io/metricsCollector: |
{
"type": "prometheus",
"implementation": {
"port": 8080,
"path": "/metrics",
"openedConnectionsMetricName": "connections",
"openedConnectionsMetricLabels": {
"type": "opened"
},
"closedConnectionsMetricName": "connections",
"closedConnectionsMetricLabels": {
"type": "closed"
}
}
}
```

The schema of the prometheus implementation configuration is:
- a mandatory `port` integer
- an optional `path` string - default to `/metrics` if not set
- a mandatory `openedConnectionsMetricName` string, for the name of the metric that expose the number of opened connections
- a mandatory `closedConnectionsMetricName` string, for the name of the metric that expose the number of closed connections
- an optional `openedConnectionsMetricLabels` object, for all labels that should match the metric for opened connections
- an optional `closedConnectionsMetricLabels` object, for all labels that should match the metric for closed connections

### Demo

Deploy the [example application](example/hello-osiris.yaml) `hello-osiris` :
Expand Down
163 changes: 46 additions & 117 deletions pkg/deployments/zeroscaler/metrics_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,10 @@ package zeroscaler
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"

k8s "github.com/deislabs/osiris/pkg/kubernetes"
"github.com/deislabs/osiris/pkg/metrics"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
Expand All @@ -19,52 +15,45 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
proxyContainerName = "osiris-proxy"
proxyPortName = "osiris-metrics"
)

type metricsCollector struct {
kubeClient kubernetes.Interface
type metricsCollectorConfig struct {
deploymentName string
deploymentNamespace string
selector labels.Selector
metricsCheckInterval time.Duration
podsInformer cache.SharedIndexInformer
currentAppPods map[string]*corev1.Pod
allAppPodStats map[string]*podStats
appPodsLock sync.Mutex
httpClient *http.Client
cancelFunc func()
scraperConfig metricsScraperConfig
}

type metricsCollector struct {
config metricsCollectorConfig
scraper metricsScraper
kubeClient kubernetes.Interface
podsInformer cache.SharedIndexInformer
currentAppPods map[string]*corev1.Pod
allAppPodStats map[string]*podStats
appPodsLock sync.Mutex
cancelFunc func()
}

func newMetricsCollector(
kubeClient kubernetes.Interface,
deploymentName string,
deploymentNamespace string,
selector labels.Selector,
metricsCheckInterval time.Duration,
) *metricsCollector {
config metricsCollectorConfig,
) (*metricsCollector, error) {
s, err := newMetricsScraper(config.scraperConfig)
if err != nil {
return nil, err
}
m := &metricsCollector{
kubeClient: kubeClient,
deploymentName: deploymentName,
deploymentNamespace: deploymentNamespace,
selector: selector,
metricsCheckInterval: metricsCheckInterval,
config: config,
scraper: s,
kubeClient: kubeClient,
podsInformer: k8s.PodsIndexInformer(
kubeClient,
deploymentNamespace,
config.deploymentNamespace,
nil,
selector,
config.selector,
),
currentAppPods: map[string]*corev1.Pod{},
allAppPodStats: map[string]*podStats{},
// A very aggressive timeout. When collecting metrics, we want to do it very
// quickly to minimize the possibility that some pods we've checked on have
// served requests while we've been checking on OTHER pods.
httpClient: &http.Client{
Timeout: 2 * time.Second,
},
}
m.podsInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: m.syncAppPod,
Expand All @@ -73,7 +62,7 @@ func newMetricsCollector(
},
DeleteFunc: m.syncDeletedAppPod,
})
return m
return m, nil
}

func (m *metricsCollector) run(ctx context.Context) {
Expand All @@ -83,14 +72,14 @@ func (m *metricsCollector) run(ctx context.Context) {
<-ctx.Done()
glog.Infof(
"Stopping metrics collection for deployment %s in namespace %s",
m.deploymentName,
m.deploymentNamespace,
m.config.deploymentName,
m.config.deploymentNamespace,
)
}()
glog.Infof(
"Starting metrics collection for deployment %s in namespace %s",
m.deploymentName,
m.deploymentNamespace,
m.config.deploymentName,
m.config.deploymentNamespace,
)
go m.podsInformer.Run(ctx.Done())
// When this exits, the cancel func will stop the informer
Expand Down Expand Up @@ -123,7 +112,7 @@ func (m *metricsCollector) syncDeletedAppPod(obj interface{}) {
}

func (m *metricsCollector) collectMetrics(ctx context.Context) {
ticker := time.NewTicker(m.metricsCheckInterval)
ticker := time.NewTicker(m.config.metricsCheckInterval)
defer ticker.Stop()
var periodStartTime, periodEndTime *time.Time
for {
Expand All @@ -146,28 +135,19 @@ func (m *metricsCollector) collectMetrics(ctx context.Context) {
// Get metrics for all of the deployment's CURRENT pods.
var scrapeWG sync.WaitGroup
for _, pod := range m.currentAppPods {
podMetricsPort, ok := getMetricsPort(pod)
if !ok {
continue
}
url := fmt.Sprintf(
"http://%s:%d/metrics",
pod.Status.PodIP,
podMetricsPort,
)
scrapeWG.Add(1)
go func(podName string) {
go func(pod *corev1.Pod) {
defer scrapeWG.Done()
// Get the results
pcs, ok := m.scrape(url)
if ok {
ps := m.allAppPodStats[podName]
pcs := m.scraper.Scrap(pod)
if pcs != nil {
ps := m.allAppPodStats[pod.Name]
ps.prevStatTime = ps.recentStatTime
ps.prevStats = ps.recentStats
ps.recentStatTime = periodEndTime
ps.recentStats = &pcs
ps.recentStats = pcs
}
}(pod.Name)
}(pod)
}
// Wait until we're done checking all pods.
scrapeWG.Wait()
Expand Down Expand Up @@ -226,64 +206,11 @@ func (m *metricsCollector) collectMetrics(ctx context.Context) {
}
}

func getMetricsPort(pod *corev1.Pod) (int32, bool) {
for _, c := range pod.Spec.Containers {
if c.Name == proxyContainerName && len(c.Ports) > 0 {
for _, port := range c.Ports {
if port.Name == proxyPortName {
return port.ContainerPort, true
}
}
}
}
return 0, false
}

func (m *metricsCollector) scrape(
target string,
) (metrics.ProxyConnectionStats, bool) {
pcs := metrics.ProxyConnectionStats{}
// Requests made with this client time out after 2 seconds
resp, err := m.httpClient.Get(target)
if err != nil {
glog.Errorf("Error requesting metrics from %s: %s", target, err)
return pcs, false
}
defer resp.Body.Close()
if resp.StatusCode != 200 {
glog.Errorf(
"Received unexpected HTTP response code %d when requesting metrics "+
"from %s",
resp.StatusCode,
target,
)
return pcs, false
}
bodyBytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
glog.Errorf(
"Error reading metrics request response from %s: %s",
target,
err,
)
return pcs, false
}
if err := json.Unmarshal(bodyBytes, &pcs); err != nil {
glog.Errorf(
"Error umarshaling metrics request response from %s: %s",
target,
err,
)
return pcs, false
}
return pcs, true
}

func (m *metricsCollector) scaleToZero() {
glog.Infof(
"Scale to zero starting for deployment %s in namespace %s",
m.deploymentName,
m.deploymentNamespace,
m.config.deploymentName,
m.config.deploymentNamespace,
)

patches := []k8s.PatchOperation{{
Expand All @@ -292,23 +219,25 @@ func (m *metricsCollector) scaleToZero() {
Value: 0,
}}
patchesBytes, _ := json.Marshal(patches)
if _, err := m.kubeClient.AppsV1().Deployments(m.deploymentNamespace).Patch(
m.deploymentName,
if _, err := m.kubeClient.AppsV1().Deployments(
m.config.deploymentNamespace,
).Patch(
m.config.deploymentName,
k8s_types.JSONPatchType,
patchesBytes,
); err != nil {
glog.Errorf(
"Error scaling deployment %s in namespace %s to zero: %s",
m.deploymentName,
m.deploymentNamespace,
m.config.deploymentName,
m.config.deploymentNamespace,
err,
)
return
}

glog.Infof(
"Scaled deployment %s in namespace %s to zero",
m.deploymentName,
m.deploymentNamespace,
m.config.deploymentName,
m.config.deploymentNamespace,
)
}
Loading