Skip to content
This repository has been archived by the owner on Nov 7, 2022. It is now read-only.

Commit

Permalink
Add processor queue (#197)
Browse files Browse the repository at this point in the history
* Start adding pipeline to collector

This is the basis to do the work for actually implementing OC collector
in the short run. At first enable receivers:

1. OC Receiver
2. Jaeger
3. Zipkin

The queue processor is added but not connected to exporters by default,
although there are options to do so.

As previously indicated before there is some duplication of code
with the agent, this should be reduced as we implement the interfaces and
project coalesce around certain patterns.

The queue is missing its metrics and those will be added soon.
  • Loading branch information
Paulo Janotti authored Nov 17, 2018
1 parent fe3e416 commit 4f3bb2c
Show file tree
Hide file tree
Showing 22 changed files with 1,485 additions and 79 deletions.
10 changes: 10 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,13 @@ $ git checkout -b feature
$ git commit
$ git push fork feature
```

## General Notes

This project uses go 1.11 and Travis for CI.

Travis CI uses the Makefile with the default target, it is recommended to
run it before submitting your PR. It runs `gofmt -s` (simplify) and `golint`.

The dependencies are managed with `go mod` if you work with the sources under your
`$GOPATH` you need to set the environment variable `GO111MODULE=on`.
24 changes: 21 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -292,20 +292,38 @@ The collector is in its initial development stages. It can be run directly
from sources, binary, or a Docker image.

1. Run from sources:
```
```shell
$ go run github.com/census-instrumentation/opencensus-service/cmd/occollector
```
2. Run from binary (from the root of your repo):
```
```shell
$ make collector
$ ./bin/occollector_$($GOOS)
```
3. Build a Docker scratch image and use the appropria Docker command for your scenario:
```
```shell
$ make docker-collector
$ docker run --rm -it -p 55678:55678 occollector
```

4. It can be configured via command-line or config file:
```shell
OpenCensus Collector
Usage:
occollector [flags]
Flags:
--add-queued-processor Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper)
--config string Path to the config file
-h, --help help for occollector
--log-level string Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL) (default "INFO")
--noop-processor Flag to add the no-op processor (combine with log level DEBUG to log incoming spans)
--receive-jaeger Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: {ThriftTChannelPort:14267 ThriftHTTPPort:14268}
--receive-oc-trace Flag to run the OpenCensus trace receiver, default settings: {Port:55678}
--receive-zipkin Flag to run the Zipkin receiver, default settings: {Port:9411}
```

[travis-image]: https://travis-ci.org/census-instrumentation/opencensus-service.svg?branch=master
[travis-url]: https://travis-ci.org/census-instrumentation/opencensus-service
[godoc-image]: https://godoc.org/github.com/census-instrumentation/opencensus-service?status.svg
Expand Down
138 changes: 138 additions & 0 deletions cmd/occollector/app/builder/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package builder handles the options to build the OpenCensus collector
// pipeline.
package builder

import (
"fmt"
"strings"

"github.com/spf13/viper"
)

const (
receiversRoot = "receivers"
jaegerEntry = "jaeger"
opencensusEntry = "opencensus"
zipkinEntry = "zipkin"
)

// JaegerReceiverCfg holds configuration for Jaeger receivers.
type JaegerReceiverCfg struct {
// ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests
ThriftTChannelPort int `mapstructure:"thrift-tchannel-port"`
// ThriftHTTPPort is the port that the relay receives on for jaeger thrift http requests
ThriftHTTPPort int `mapstructure:"thrift-http-port"`
}

// JaegerReceiverEnabled checks if the Jaeger receiver is enabled, via a command-line flag, environment
// variable, or configuration file.
func JaegerReceiverEnabled(v *viper.Viper, cmdFlag string) bool {
return featureEnabled(v, cmdFlag, receiversRoot, jaegerEntry)
}

// NewDefaultJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values
func NewDefaultJaegerReceiverCfg() *JaegerReceiverCfg {
opts := &JaegerReceiverCfg{
ThriftTChannelPort: 14267,
ThriftHTTPPort: 14268,
}
return opts
}

// InitFromViper returns a JaegerReceiverCfg according to the configuration.
func (cfg *JaegerReceiverCfg) InitFromViper(v *viper.Viper) (*JaegerReceiverCfg, error) {
return cfg, initFromViper(cfg, v, receiversRoot, jaegerEntry)
}

// OpenCensusReceiverCfg holds configuration for OpenCensus receiver.
type OpenCensusReceiverCfg struct {
// Port is the port that the receiver will use
Port int `mapstructure:"port"`
}

// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment
// variable, or configuration file.
func OpenCensusReceiverEnabled(v *viper.Viper, cmdFlag string) bool {
return featureEnabled(v, cmdFlag, receiversRoot, opencensusEntry)
}

// NewDefaultOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values
func NewDefaultOpenCensusReceiverCfg() *OpenCensusReceiverCfg {
opts := &OpenCensusReceiverCfg{
Port: 55678,
}
return opts
}

// InitFromViper returns a OpenCensusReceiverCfg according to the configuration.
func (cfg *OpenCensusReceiverCfg) InitFromViper(v *viper.Viper) (*OpenCensusReceiverCfg, error) {
return cfg, initFromViper(cfg, v, receiversRoot, opencensusEntry)
}

// ZipkinReceiverCfg holds configuration for Zipkin receiver.
type ZipkinReceiverCfg struct {
// Port is the port that the receiver will use
Port int `mapstructure:"port"`
}

// ZipkinReceiverEnabled checks if the Zipkin receiver is enabled, via a command-line flag, environment
// variable, or configuration file.
func ZipkinReceiverEnabled(v *viper.Viper, cmdFlag string) bool {
return featureEnabled(v, cmdFlag, receiversRoot, zipkinEntry)
}

// NewDefaultZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values
func NewDefaultZipkinReceiverCfg() *ZipkinReceiverCfg {
opts := &ZipkinReceiverCfg{
Port: 9411,
}
return opts
}

// InitFromViper returns a ZipkinReceiverCfg according to the configuration.
func (cfg *ZipkinReceiverCfg) InitFromViper(v *viper.Viper) (*ZipkinReceiverCfg, error) {
return cfg, initFromViper(cfg, v, receiversRoot, zipkinEntry)
}

// Helper functions

func initFromViper(cfg interface{}, v *viper.Viper, labels ...string) error {
v = getViperSub(v, labels...)
if v == nil {
return nil
}
if err := v.Unmarshal(cfg); err != nil {
return fmt.Errorf("Failed to read configuration for %s %v", strings.Join(labels, ": "), err)
}

return nil
}

func getViperSub(v *viper.Viper, labels ...string) *viper.Viper {
for _, label := range labels {
v = v.Sub(label)
if v == nil {
return nil
}
}

return v
}

func featureEnabled(v *viper.Viper, cmdFlag string, labels ...string) bool {
return v.GetBool(cmdFlag) || (getViperSub(v, labels...) != nil)
}
80 changes: 80 additions & 0 deletions cmd/occollector/app/builder/builder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2018, OpenCensus Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package builder

import (
"reflect"
"testing"

"github.com/spf13/viper"
)

func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) {
v, err := loadViperFromFile("./testdata/receivers_enabled.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z")
if !jaegerEnabled || !opencensusEnabled || !zipkinEnabled {
t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled)
}

wj := NewDefaultJaegerReceiverCfg()
gj, err := wj.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err)
} else if !reflect.DeepEqual(wj, gj) {
t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj)
}

woc := NewDefaultOpenCensusReceiverCfg()
goc, err := woc.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err)
} else if !reflect.DeepEqual(woc, goc) {
t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc)
}

wz := NewDefaultZipkinReceiverCfg()
gz, err := wz.InitFromViper(v)
if err != nil {
t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err)
} else if !reflect.DeepEqual(wz, gz) {
t.Errorf("Incorrect config for Zipkin receiver, want %v got %v", wz, gz)
}
}

func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) {
v, err := loadViperFromFile("./testdata/receivers_disabled.yaml")
if err != nil {
t.Fatalf("Failed to load viper from test file: %v", err)
}

jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z")
if jaegerEnabled || opencensusEnabled || zipkinEnabled {
t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled)
}
}

func loadViperFromFile(file string) (*viper.Viper, error) {
v := viper.New()
v.SetConfigFile(file)
err := v.ReadInConfig()
if err != nil {
return nil, err
}
return v, nil
}
5 changes: 5 additions & 0 deletions cmd/occollector/app/builder/testdata/receivers_disabled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# No receivers enabled, all commented out
receivers:
# jaeger: {}
# opencensus: {}
# zipkin: {}
5 changes: 5 additions & 0 deletions cmd/occollector/app/builder/testdata/receivers_enabled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Enable receivers with default configuration
receivers:
jaeger: {}
opencensus: {}
zipkin: {}
Loading

0 comments on commit 4f3bb2c

Please sign in to comment.