diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8b41cba0..0bf13ced 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,3 +53,13 @@ $ git checkout -b feature $ git commit $ git push fork feature ``` + +## General Notes + +This project uses go 1.11 and Travis for CI. + +Travis CI uses the Makefile with the default target, it is recommended to +run it before submitting your PR. It runs `gofmt -s` (simplify) and `golint`. + +The dependencies are managed with `go mod` if you work with the sources under your +`$GOPATH` you need to set the environment variable `GO111MODULE=on`. \ No newline at end of file diff --git a/README.md b/README.md index c4a1389a..c1302a17 100644 --- a/README.md +++ b/README.md @@ -292,20 +292,38 @@ The collector is in its initial development stages. It can be run directly from sources, binary, or a Docker image. 1. Run from sources: -``` +```shell $ go run github.com/census-instrumentation/opencensus-service/cmd/occollector ``` 2. Run from binary (from the root of your repo): -``` +```shell $ make collector $ ./bin/occollector_$($GOOS) ``` 3. Build a Docker scratch image and use the appropria Docker command for your scenario: -``` +```shell $ make docker-collector $ docker run --rm -it -p 55678:55678 occollector ``` +4. It can be configured via command-line or config file: +```shell +OpenCensus Collector + +Usage: + occollector [flags] + +Flags: + --add-queued-processor Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper) + --config string Path to the config file + -h, --help help for occollector + --log-level string Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL) (default "INFO") + --noop-processor Flag to add the no-op processor (combine with log level DEBUG to log incoming spans) + --receive-jaeger Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: {ThriftTChannelPort:14267 ThriftHTTPPort:14268} + --receive-oc-trace Flag to run the OpenCensus trace receiver, default settings: {Port:55678} + --receive-zipkin Flag to run the Zipkin receiver, default settings: {Port:9411} +``` + [travis-image]: https://travis-ci.org/census-instrumentation/opencensus-service.svg?branch=master [travis-url]: https://travis-ci.org/census-instrumentation/opencensus-service [godoc-image]: https://godoc.org/github.com/census-instrumentation/opencensus-service?status.svg diff --git a/cmd/occollector/app/builder/builder.go b/cmd/occollector/app/builder/builder.go new file mode 100644 index 00000000..de8e753c --- /dev/null +++ b/cmd/occollector/app/builder/builder.go @@ -0,0 +1,138 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package builder handles the options to build the OpenCensus collector +// pipeline. +package builder + +import ( + "fmt" + "strings" + + "github.com/spf13/viper" +) + +const ( + receiversRoot = "receivers" + jaegerEntry = "jaeger" + opencensusEntry = "opencensus" + zipkinEntry = "zipkin" +) + +// JaegerReceiverCfg holds configuration for Jaeger receivers. +type JaegerReceiverCfg struct { + // ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests + ThriftTChannelPort int `mapstructure:"thrift-tchannel-port"` + // ThriftHTTPPort is the port that the relay receives on for jaeger thrift http requests + ThriftHTTPPort int `mapstructure:"thrift-http-port"` +} + +// JaegerReceiverEnabled checks if the Jaeger receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func JaegerReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, jaegerEntry) +} + +// NewDefaultJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values +func NewDefaultJaegerReceiverCfg() *JaegerReceiverCfg { + opts := &JaegerReceiverCfg{ + ThriftTChannelPort: 14267, + ThriftHTTPPort: 14268, + } + return opts +} + +// InitFromViper returns a JaegerReceiverCfg according to the configuration. +func (cfg *JaegerReceiverCfg) InitFromViper(v *viper.Viper) (*JaegerReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, jaegerEntry) +} + +// OpenCensusReceiverCfg holds configuration for OpenCensus receiver. +type OpenCensusReceiverCfg struct { + // Port is the port that the receiver will use + Port int `mapstructure:"port"` +} + +// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func OpenCensusReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, opencensusEntry) +} + +// NewDefaultOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values +func NewDefaultOpenCensusReceiverCfg() *OpenCensusReceiverCfg { + opts := &OpenCensusReceiverCfg{ + Port: 55678, + } + return opts +} + +// InitFromViper returns a OpenCensusReceiverCfg according to the configuration. +func (cfg *OpenCensusReceiverCfg) InitFromViper(v *viper.Viper) (*OpenCensusReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, opencensusEntry) +} + +// ZipkinReceiverCfg holds configuration for Zipkin receiver. +type ZipkinReceiverCfg struct { + // Port is the port that the receiver will use + Port int `mapstructure:"port"` +} + +// ZipkinReceiverEnabled checks if the Zipkin receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func ZipkinReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, zipkinEntry) +} + +// NewDefaultZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values +func NewDefaultZipkinReceiverCfg() *ZipkinReceiverCfg { + opts := &ZipkinReceiverCfg{ + Port: 9411, + } + return opts +} + +// InitFromViper returns a ZipkinReceiverCfg according to the configuration. +func (cfg *ZipkinReceiverCfg) InitFromViper(v *viper.Viper) (*ZipkinReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, zipkinEntry) +} + +// Helper functions + +func initFromViper(cfg interface{}, v *viper.Viper, labels ...string) error { + v = getViperSub(v, labels...) + if v == nil { + return nil + } + if err := v.Unmarshal(cfg); err != nil { + return fmt.Errorf("Failed to read configuration for %s %v", strings.Join(labels, ": "), err) + } + + return nil +} + +func getViperSub(v *viper.Viper, labels ...string) *viper.Viper { + for _, label := range labels { + v = v.Sub(label) + if v == nil { + return nil + } + } + + return v +} + +func featureEnabled(v *viper.Viper, cmdFlag string, labels ...string) bool { + return v.GetBool(cmdFlag) || (getViperSub(v, labels...) != nil) +} diff --git a/cmd/occollector/app/builder/builder_test.go b/cmd/occollector/app/builder/builder_test.go new file mode 100644 index 00000000..4f19c1eb --- /dev/null +++ b/cmd/occollector/app/builder/builder_test.go @@ -0,0 +1,80 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "reflect" + "testing" + + "github.com/spf13/viper" +) + +func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { + v, err := loadViperFromFile("./testdata/receivers_enabled.yaml") + if err != nil { + t.Fatalf("Failed to load viper from test file: %v", err) + } + + jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if !jaegerEnabled || !opencensusEnabled || !zipkinEnabled { + t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled) + } + + wj := NewDefaultJaegerReceiverCfg() + gj, err := wj.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err) + } else if !reflect.DeepEqual(wj, gj) { + t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj) + } + + woc := NewDefaultOpenCensusReceiverCfg() + goc, err := woc.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err) + } else if !reflect.DeepEqual(woc, goc) { + t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc) + } + + wz := NewDefaultZipkinReceiverCfg() + gz, err := wz.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err) + } else if !reflect.DeepEqual(wz, gz) { + t.Errorf("Incorrect config for Zipkin receiver, want %v got %v", wz, gz) + } +} + +func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) { + v, err := loadViperFromFile("./testdata/receivers_disabled.yaml") + if err != nil { + t.Fatalf("Failed to load viper from test file: %v", err) + } + + jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if jaegerEnabled || opencensusEnabled || zipkinEnabled { + t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled) + } +} + +func loadViperFromFile(file string) (*viper.Viper, error) { + v := viper.New() + v.SetConfigFile(file) + err := v.ReadInConfig() + if err != nil { + return nil, err + } + return v, nil +} diff --git a/cmd/occollector/app/builder/testdata/receivers_disabled.yaml b/cmd/occollector/app/builder/testdata/receivers_disabled.yaml new file mode 100644 index 00000000..249d7d75 --- /dev/null +++ b/cmd/occollector/app/builder/testdata/receivers_disabled.yaml @@ -0,0 +1,5 @@ +# No receivers enabled, all commented out +receivers: + # jaeger: {} + # opencensus: {} + # zipkin: {} diff --git a/cmd/occollector/app/builder/testdata/receivers_enabled.yaml b/cmd/occollector/app/builder/testdata/receivers_enabled.yaml new file mode 100644 index 00000000..1287369a --- /dev/null +++ b/cmd/occollector/app/builder/testdata/receivers_enabled.yaml @@ -0,0 +1,5 @@ +# Enable receivers with default configuration +receivers: + jaeger: {} + opencensus: {} + zipkin: {} diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go new file mode 100644 index 00000000..dc3cae6b --- /dev/null +++ b/cmd/occollector/app/collector/collector.go @@ -0,0 +1,268 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector handles the command-line, configuration, and runs the OC collector. +package collector + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/exporter" + "github.com/census-instrumentation/opencensus-service/exporter/exporterparser" + "github.com/census-instrumentation/opencensus-service/internal/collector/jaeger" + "github.com/census-instrumentation/opencensus-service/internal/collector/opencensus" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/internal/collector/zipkin" +) + +const ( + configCfg = "config" + logLevelCfg = "log-level" + jaegerReceiverFlg = "receive-jaeger" + ocReceiverFlg = "receive-oc-trace" + zipkinReceiverFlg = "receive-zipkin" + debugProcessorFlg = "debug-processor" + queuedProcessorFlg = "add-queued-processor" // TODO: (@pjanotti) this is temporary flag until it can be read from config. +) + +var ( + config string + + v = viper.New() + + logger *zap.Logger + + rootCmd = &cobra.Command{ + Use: "occollector", + Long: "OpenCensus Collector", + Run: func(cmd *cobra.Command, args []string) { + if file := v.GetString(configCfg); file != "" { + v.SetConfigFile(file) + err := v.ReadInConfig() + if err != nil { + log.Fatalf("Error loading config file %q: %v", file, err) + return + } + } + + execute() + }, + } +) + +func init() { + rootCmd.PersistentFlags().String(logLevelCfg, "INFO", "Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL)") + v.BindPFlag(logLevelCfg, rootCmd.PersistentFlags().Lookup(logLevelCfg)) + + // local flags + rootCmd.Flags().StringVar(&config, configCfg, "", "Path to the config file") + rootCmd.Flags().Bool(jaegerReceiverFlg, false, + fmt.Sprintf("Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: %+v", *builder.NewDefaultJaegerReceiverCfg())) + rootCmd.Flags().Bool(ocReceiverFlg, false, + fmt.Sprintf("Flag to run the OpenCensus trace receiver, default settings: %+v", *builder.NewDefaultOpenCensusReceiverCfg())) + rootCmd.Flags().Bool(zipkinReceiverFlg, false, + fmt.Sprintf("Flag to run the Zipkin receiver, default settings: %+v", *builder.NewDefaultZipkinReceiverCfg())) + rootCmd.Flags().Bool(debugProcessorFlg, false, "Flag to add a debug processor (combine with log level DEBUG to log incoming spans)") + rootCmd.Flags().Bool(queuedProcessorFlg, false, "Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper)") + + // TODO: (@pjanotti) add builder options as flags, before calls bellow. Likely it will require code re-org. + + v.AutomaticEnv() + v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) + v.BindPFlags(rootCmd.Flags()) +} + +func newLogger() (*zap.Logger, error) { + var level zapcore.Level + err := (&level).UnmarshalText([]byte(v.GetString(logLevelCfg))) + if err != nil { + return nil, err + } + conf := zap.NewProductionConfig() + conf.Level.SetLevel(level) + return conf.Build() +} + +func execute() { + var signalsChannel = make(chan os.Signal) + signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + + var err error + logger, err = newLogger() + if err != nil { + log.Fatalf("Failed to get logger: %v", err) + } + + logger.Info("Starting...") + + // Build pipeline from its end: 1st exporters, the OC-proto queue processor, and + // finally the receivers. + + var closeFns []func() + var spanProcessors []processor.SpanProcessor + exportersCloseFns, exporters := createExporters() + closeFns = append(closeFns, exportersCloseFns...) + if len(exporters) > 0 { + // Exporters need an extra hop from OC-proto to span data: to workaround that for now + // we will use a special processor that transforms the data to a format that they can consume. + // TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use + // the exporters: this can lose node information and it is not ideal for performance and delegates + // the retry/buffering to the exporters (that are designed to run within the tracing process). + spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(exporters...)) + } + + if v.GetBool(debugProcessorFlg) { + spanProcessors = append(spanProcessors, processor.NewNoopSpanProcessor(logger)) + } + + if len(spanProcessors) == 0 { + logger.Warn("Nothing to do: no processor was enabled. Shutting down.") + os.Exit(1) + } + + // TODO: (@pjanotti) construct queued processor from config options, for now just to exercise it, wrap one around + // the first span processor available. + if v.GetBool(queuedProcessorFlg) { + spanProcessors[0] = processor.NewQueuedSpanProcessor(spanProcessors[0]) + } + + // Wraps processors in a single one to be connected to all enabled receivers. + spanProcessor := processor.NewMultiSpanProcessor(spanProcessors...) + + receiversCloseFns := createReceivers(spanProcessor) + closeFns = append(closeFns, receiversCloseFns...) + + logger.Info("Collector is up and running.") + + <-signalsChannel + logger.Info("Starting shutdown...") + + // TODO: orderly shutdown: first receivers, then flushing pipelines giving + // senders a chance to send all their data. This may take time, the allowed + // time should be part of configuration. + for i := len(closeFns) - 1; i > 0; i-- { + closeFns[i]() + } + + logger.Info("Shutdown complete.") +} + +func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter) { + // TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility. + parseFns := []struct { + name string + fn func([]byte) ([]exporter.TraceExporter, []func() error, error) + }{ + {name: "datadog", fn: exporterparser.DatadogTraceExportersFromYAML}, + {name: "stackdriver", fn: exporterparser.StackdriverTraceExportersFromYAML}, + {name: "zipkin", fn: exporterparser.ZipkinExportersFromYAML}, + {name: "jaeger", fn: exporterparser.JaegerExportersFromYAML}, + {name: "kafka", fn: exporterparser.KafkaExportersFromYAML}, + } + + if config == "" { + logger.Info("No config file, exporters can be only configured via the file.") + return + } + + cfgBlob, err := ioutil.ReadFile(config) + if err != nil { + logger.Fatal("Cannot read config file for exporters", zap.Error(err)) + } + + for _, cfg := range parseFns { + tes, tesDoneFns, err := cfg.fn(cfgBlob) + if err != nil { + logger.Fatal("Failed to create config for exporter", zap.String("exporter", cfg.name), zap.Error(err)) + } + + wasEnabled := false + for _, te := range tes { + if te != nil { + wasEnabled = true + traceExporters = append(traceExporters, te) + } + } + + for _, tesDoneFn := range tesDoneFns { + if tesDoneFn != nil { + wrapperFn := func() { + if err := tesDoneFn(); err != nil { + logger.Warn("Error when closing exporter", zap.String("exporter", cfg.name), zap.Error(err)) + } + } + doneFns = append(doneFns, wrapperFn) + } + } + + if wasEnabled { + logger.Info("Exporter enabled", zap.String("exporter", cfg.name)) + } + } + + return doneFns, traceExporters +} + +func createReceivers(spanProcessor processor.SpanProcessor) (closeFns []func()) { + var someReceiverEnabled bool + receivers := []struct { + name string + runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (func(), error) + enabled bool + }{ + {"Jaeger", jaegerreceiver.Run, builder.JaegerReceiverEnabled(v, jaegerReceiverFlg)}, + {"OpenCensus", ocreceiver.Run, builder.OpenCensusReceiverEnabled(v, ocReceiverFlg)}, + {"Zipkin", zipkinreceiver.Run, builder.ZipkinReceiverEnabled(v, zipkinReceiverFlg)}, + } + + for _, receiver := range receivers { + if receiver.enabled { + closeSrv, err := receiver.runFn(logger, v, spanProcessor) + if err != nil { + // TODO: (@pjanotti) better shutdown, for now just try to stop any started receiver before terminating. + for _, closeFn := range closeFns { + closeFn() + } + logger.Fatal("Cannot run receiver for "+receiver.name, zap.Error(err)) + } + closeFns = append(closeFns, closeSrv) + someReceiverEnabled = true + } + } + + if !someReceiverEnabled { + logger.Warn("Nothing to do: no receiver was enabled. Shutting down.") + os.Exit(1) + } + + return closeFns +} + +// Execute the application according to the command and configuration given +// by the user. +func Execute() error { + return rootCmd.Execute() +} diff --git a/cmd/occollector/main.go b/cmd/occollector/main.go index 1e8a12f9..8b12f534 100644 --- a/cmd/occollector/main.go +++ b/cmd/occollector/main.go @@ -20,86 +20,13 @@ package main import ( - "context" - "fmt" "log" - "os" - "os/signal" - "syscall" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/census-instrumentation/opencensus-service/receiver/opencensus" - "github.com/census-instrumentation/opencensus-service/spansink" - - "go.opencensus.io/plugin/ocgrpc" - "go.opencensus.io/stats/view" - "go.uber.org/zap" -) - -const ( - defaultOCAddress = ":55678" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/collector" ) func main() { - var signalsChannel = make(chan os.Signal) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) - - // TODO: Allow configuration of logger and other items such as servers, etc. - logger, err := zap.NewProduction() - if err != nil { - log.Fatalf("Failed to get production logger: %v", err) - } - - logger.Info("Starting...") - - closeSrv, err := runOCServerWithReceiver(defaultOCAddress, logger) - if err != nil { - logger.Fatal("Cannot run opencensus server", zap.String("Address", defaultOCAddress), zap.Error(err)) - } - - logger.Info("Collector is up and running.") - - <-signalsChannel - logger.Info("Starting shutdown...") - - // TODO: orderly shutdown: first receivers, then flushing pipelines giving - // senders a chance to send all their data. This may take time, the allowed - // time should be part of configuration. - closeSrv() - - logger.Info("Shutdown complete.") -} - -func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, error) { - if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) - } - - sr := &fakeSpanSink{ - logger: logger, + if err := collector.Execute(); err != nil { + log.Fatalf("Failed to run the collector: %v", err) } - - ocr, err := opencensus.New(addr) - if err != nil { - return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) - } - if err := ocr.StartTraceReception(context.Background(), sr); err != nil { - return nil, fmt.Errorf("Failed to start OpenCensus TraceReceiver : %v", err) - } - return ocr.Stop, nil -} - -type fakeSpanSink struct { - logger *zap.Logger -} - -func (sr *fakeSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { - ack := &spansink.Acknowledgement{ - SavedSpans: uint64(len(spans)), - } - - sr.logger.Info("ReceivedSpans", zap.Uint64("Received spans", ack.SavedSpans)) - - return ack, nil } diff --git a/go.mod b/go.mod index 0ca70cfe..2146fae5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ require ( contrib.go.opencensus.io/exporter/ocagent v0.3.0 contrib.go.opencensus.io/exporter/stackdriver v0.7.0 git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 // indirect + github.com/BurntSushi/toml v0.3.1 // indirect github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 // indirect github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20181026070331-e7c4bd17b329 github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 // indirect github.com/aws/aws-sdk-go v1.15.68 // indirect + github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/census-instrumentation/opencensus-proto v0.1.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/gogo/googleapis v1.1.0 // indirect @@ -20,13 +22,18 @@ require ( github.com/jaegertracing/jaeger v1.7.0 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/mitchellh/mapstructure v1.1.2 // indirect github.com/opentracing/opentracing-go v1.0.2 // indirect github.com/openzipkin/zipkin-go v0.1.3 github.com/philhofer/fwd v1.0.0 // indirect github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect + github.com/spf13/viper v1.2.1 + github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 // indirect + github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.2.2 // indirect github.com/tinylib/msgp v1.0.2 // indirect github.com/uber-go/atomic v1.3.2 // indirect diff --git a/go.sum b/go.sum index ddd5c0eb..dde3dbdd 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999 h1:sihTnRgTOUSCQz0i git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 h1:km2LQYVCbsEbT7HCJ/nrMopI5CvJUkniCcSf9ZP5+iQ= git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 h1:dmc/C8bpE5VkQn65PNbbyACDC8xw8Hpp/NEurdPmQDQ= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20181026070331-e7c4bd17b329 h1:WOxkY7ClXANNyQQuq4rxQrM/nQnjXCpvqY0ipYxB9cQ= @@ -21,11 +23,16 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M= github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:CZI8h5fmYwCCvd2RMSsjLqHN6OqABlWJweFKxz4vdEs= +github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:CZI8h5fmYwCCvd2RMSsjLqHN6OqABlWJweFKxz4vdEs= +github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/aws/aws-sdk-go v1.15.31/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.15.68 h1:2+CJhKkxU/ldztVaJ7V5adR1/ZnYl+oc7ufq2Bl18BQ= github.com/aws/aws-sdk-go v1.15.68/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= +github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/census-instrumentation/opencensus-proto v0.0.2-0.20180913191712-f303ae3f8d6a/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.0.2/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.1.0 h1:VwZ9smxzX8u14/125wHIX7ARV+YhR+L4JADswwxWK0Y= @@ -41,6 +48,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -67,6 +76,8 @@ github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/grpc-ecosystem/grpc-gateway v1.5.0 h1:WcmKMm43DR7RdtlkEXQJyo5ws8iTp98CyhCCbOHMvNI= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jaegertracing/jaeger v1.7.0 h1:RVpmOTj7Zb9QRFHI5CYvkYSTR8Cdn/PhM5kBxA4pZBM= @@ -81,12 +92,21 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/openzipkin/zipkin-go v0.1.3 h1:36hTtUTQR/vPX7YVJo2PYexSbHdAJiAkDrjuXw/YlYQ= github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v0.0.0-20181005164709-635575b42742 h1:wKfigKMTgvSzBLIVvB5QaBBQI0odU6n45/UKSphjLus= @@ -96,16 +116,35 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a h1:AA9vgIBDjMHPC2McaGPojgV2dcI78ZC0TLNhYCXEKH8= +github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a/go.mod h1:lzZQ3Noex5pfAy7mkAeCjcBDteYU85uWWnJ/y6gKU8k= +github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e h1:n/3MEhJQjQxrOUCzh1Y3Re6aJUUWRp2M9+Oc3eVn/54= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 h1:agujYaXJSxSo18YNX3jzl+4G6Bstwt+kqv47GS12uL0= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= +github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= +github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6VSaahyKadf4WtSsJIgne6A1WLOAGM8A= +github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= @@ -146,6 +185,7 @@ golang.org/x/oauth2 v0.0.0-20181102170140-232e45548389/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181031143558-9b800f95dbbc h1:SdCq5U4J+PpbSDIl9bM0V1e1Ug1jsnBkAFvTs1htn7U= diff --git a/internal/collector/jaeger/receiver.go b/internal/collector/jaeger/receiver.go new file mode 100644 index 00000000..f91bb30d --- /dev/null +++ b/internal/collector/jaeger/receiver.go @@ -0,0 +1,63 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jaegerreceiver wraps the functionality to start the end-point that +// receives Jaeger data sent by the jaeger-agent in jaeger.thrift format over +// TChannel and directly from clients in jaeger.thrift format over binary thrift +// protocol (HTTP transport). +// Note that the UDP transport is not supported since these protocol/transport +// are for task->jaeger-agent communication only and the receiver does not try to +// support jaeger-agent endpoints. +// TODO: add support for the jaeger proto endpoint released in jaeger 1.8package jaegerreceiver +package jaegerreceiver + +import ( + "context" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/jaeger" +) + +// Run starts the Jaeger receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewDefaultJaegerReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + ctx := context.Background() + jtr, err := jaeger.New(ctx, rOpts.ThriftTChannelPort, rOpts.ThriftHTTPPort) + if err != nil { + return nil, err + } + + ss := processor.WrapWithSpanSink("jaeger", spanProc) + if err := jtr.StartTraceReception(ctx, ss); err != nil { + return nil, err + } + + logger.Info("Jaeger receiver is running.", + zap.Int("thrift-tchannel-port", rOpts.ThriftTChannelPort), + zap.Int("thrift-http-port", rOpts.ThriftHTTPPort)) + + closeFn := func() { + jtr.StopTraceReception(context.Background()) + } + + return closeFn, nil +} diff --git a/internal/collector/opencensus/receiver.go b/internal/collector/opencensus/receiver.go new file mode 100644 index 00000000..5571b9da --- /dev/null +++ b/internal/collector/opencensus/receiver.go @@ -0,0 +1,71 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ocreceiver wraps the functionality to start the end-point that +// receives data directly in the OpenCensus format. +package ocreceiver + +import ( + "fmt" + "net" + "strconv" + "time" + + "github.com/spf13/viper" + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/stats/view" + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace" +) + +// Run starts the OpenCensus receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + grpcSrv := internal.GRPCServerWithObservabilityEnabled() + + if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { + return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) + } + + lis, err := net.Listen("tcp", ":"+strconv.FormatInt(int64(rOpts.Port), 10)) + if err != nil { + return nil, fmt.Errorf("Cannot bind tcp listener to address: %v", err) + } + + ss := processor.WrapWithSpanSink("oc", spanProc) + oci, err := octrace.New(ss, octrace.WithSpanBufferPeriod(1*time.Second)) + if err != nil { + return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) + } + + agenttracepb.RegisterTraceServiceServer(grpcSrv, oci) + go func() { + if err := grpcSrv.Serve(lis); err != nil { + logger.Error("OpenCensus gRPC shutdown", zap.Error(err)) + } + }() + + logger.Info("OpenCensus receiver is running.", zap.Int("port", rOpts.Port)) + + return grpcSrv.Stop, nil +} diff --git a/internal/collector/processor/doc.go b/internal/collector/processor/doc.go new file mode 100644 index 00000000..6f24e7e0 --- /dev/null +++ b/internal/collector/processor/doc.go @@ -0,0 +1,19 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package processor is the central point on the collector processing: it +// aggregates and performs any operation that applies to all traces in the +// pipeline. Traces reach it after being converted to the OpenCensus protobuf +// format. +package processor diff --git a/internal/collector/processor/exporter_processor.go b/internal/collector/processor/exporter_processor.go new file mode 100644 index 00000000..d54b5ce8 --- /dev/null +++ b/internal/collector/processor/exporter_processor.go @@ -0,0 +1,41 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "context" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/exporter" +) + +type exporterSpanProcessor struct { + tes exporter.TraceExporterSink +} + +var _ SpanProcessor = (*exporterSpanProcessor)(nil) + +// NewTraceExporterProcessor creates processor that feeds SpanData to the given trace exporters. +func NewTraceExporterProcessor(traceExporters ...exporter.TraceExporter) SpanProcessor { + return &exporterSpanProcessor{tes: exporter.MultiTraceExporters(traceExporters...)} +} + +func (sp *exporterSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + ack, err := sp.tes.ReceiveSpans(context.Background(), batch.Node, batch.Spans...) + if err != nil { + return ack.DroppedSpans, err + } + return 0, nil +} diff --git a/internal/collector/processor/multi_processor.go b/internal/collector/processor/multi_processor.go new file mode 100644 index 00000000..4f81effe --- /dev/null +++ b/internal/collector/processor/multi_processor.go @@ -0,0 +1,66 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "fmt" + "strings" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +// MultiSpanProcessor enables processing on multiple processors. +// For each incoming span batch, it calls ProcessSpans method on each span +// processor one-by-one. It aggregates success/failures/errors from all of +// them and reports the result upstream. +type MultiSpanProcessor []SpanProcessor + +var _ SpanProcessor = (*MultiSpanProcessor)(nil) + +// NewMultiSpanProcessor creates a MultiSpanProcessor from the variadic +// list of passed SpanProcessors. +func NewMultiSpanProcessor(procs ...SpanProcessor) MultiSpanProcessor { + return procs +} + +// ProcessSpans implements the SpanProcessor interface +func (msp MultiSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + var maxFailures uint64 + var errors []error + for _, sp := range msp { + failures, err := sp.ProcessSpans(batch, spanFormat) + if err != nil { + errors = append(errors, err) + } + + if failures > maxFailures { + maxFailures = failures + } + } + + var err error + numErrors := len(errors) + if numErrors == 1 { + err = errors[0] + } else if numErrors > 1 { + errMsgs := make([]string, numErrors) + for _, err := range errors { + errMsgs = append(errMsgs, err.Error()) + } + err = fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) + } + + return maxFailures, err +} diff --git a/internal/collector/processor/multi_processor_test.go b/internal/collector/processor/multi_processor_test.go new file mode 100644 index 00000000..7330fed8 --- /dev/null +++ b/internal/collector/processor/multi_processor_test.go @@ -0,0 +1,144 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "fmt" + "testing" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +func TestMultiSpanProcessorMultiplexing(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, 7), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + wantSpansCount += len(batch.Spans) + tt.ProcessSpans(batch, "test") + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +func TestMultiSpanProcessorSomeNotOk(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + // Make one processor return false for some spans + m := processors[1].(*mockSpanProcessor) + wantFailures := uint64(2) + m.Failures = wantFailures + + tt := NewMultiSpanProcessor(processors...) + spans := make([]*tracepb.Span, wantFailures+3) + for i := range spans { + spans[i] = &tracepb.Span{} + } + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: spans, + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + failures, _ := tt.ProcessSpans(batch, "test") + batchSize := len(batch.Spans) + wantSpansCount += batchSize + if wantFailures != failures { + t.Errorf("Wanted %d failures but got %d", wantFailures, failures) + } + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +func TestMultiSpanProcessorWhenOneErrors(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + // Make one processor return error + m := processors[1].(*mockSpanProcessor) + m.MustFail = true + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, 5), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + failures, err := tt.ProcessSpans(batch, "test") + if err == nil { + t.Errorf("Wanted error got nil") + return + } + batchSize := len(batch.Spans) + wantSpansCount += batchSize + if failures != uint64(batchSize) { + t.Errorf("Wanted all spans to fail, got a different value.") + } + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +type mockSpanProcessor struct { + Failures uint64 + TotalSpans int + MustFail bool +} + +var _ SpanProcessor = &mockSpanProcessor{} + +func (p *mockSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + batchSize := len(batch.Spans) + p.TotalSpans += batchSize + if p.MustFail { + return uint64(batchSize), fmt.Errorf("this processor must fail") + } + + return p.Failures, nil +} diff --git a/internal/collector/processor/options.go b/internal/collector/processor/options.go new file mode 100644 index 00000000..37de9129 --- /dev/null +++ b/internal/collector/processor/options.go @@ -0,0 +1,110 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "time" + + "go.uber.org/zap" +) + +const ( + // DefaultNumWorkers is the default number of workers consuming from the processor queue + DefaultNumWorkers = 10 + // DefaultQueueSize is the default maximum number of span batches allowed in the processor's queue + DefaultQueueSize = 1000 +) + +type options struct { + logger *zap.Logger + name string + numWorkers int + queueSize int + backoffDelay time.Duration + extraFormatTypes []string + retryOnProcessingFailure bool +} + +// Option is a function that sets some option on the component. +type Option func(c *options) + +// Options is a factory for all available Option's +var Options options + +// WithLogger creates a Option that initializes the logger +func (options) WithLogger(logger *zap.Logger) Option { + return func(b *options) { + b.logger = logger + } +} + +// WithName creates an Option that initializes the name of the processor +func (options) WithName(name string) Option { + return func(b *options) { + b.name = name + } +} + +// WithNumWorkers creates an Option that initializes the number of queue consumers AKA workers +func (options) WithNumWorkers(numWorkers int) Option { + return func(b *options) { + b.numWorkers = numWorkers + } +} + +// WithQueueSize creates an Option that initializes the queue size +func (options) WithQueueSize(queueSize int) Option { + return func(b *options) { + b.queueSize = queueSize + } +} + +// WithBackoffDelay creates an Option that initializes the backoff delay +func (options) WithBackoffDelay(backoffDelay time.Duration) Option { + return func(b *options) { + b.backoffDelay = backoffDelay + } +} + +// WithExtraFormatTypes creates an Option that initializes the extra list of format types +func (options) WithExtraFormatTypes(extraFormatTypes []string) Option { + return func(b *options) { + b.extraFormatTypes = extraFormatTypes + } +} + +// WithRetryOnProcessingFailures creates an Option that initializes the retryOnProcessingFailure boolean +func (options) WithRetryOnProcessingFailures(retryOnProcessingFailure bool) Option { + return func(b *options) { + b.retryOnProcessingFailure = retryOnProcessingFailure + } +} + +func (o options) apply(opts ...Option) options { + ret := options{} + for _, opt := range opts { + opt(&ret) + } + if ret.logger == nil { + ret.logger = zap.NewNop() + } + if ret.numWorkers == 0 { + ret.numWorkers = DefaultNumWorkers + } + if ret.queueSize == 0 { + ret.queueSize = DefaultQueueSize + } + return ret +} diff --git a/internal/collector/processor/processor.go b/internal/collector/processor/processor.go new file mode 100644 index 00000000..e91f1ed3 --- /dev/null +++ b/internal/collector/processor/processor.go @@ -0,0 +1,47 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +// SpanProcessor handles batches of spans converted to OpenCensus proto format. +type SpanProcessor interface { + // ProcessSpans processes spans and return with the number of spans that failed and an error. + ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) + // TODO: (@pjanotti) For shutdown improvement, the interface needs a method to attempt that. +} + +// An initial processor that does not sends the data to any destination but helps debugging. +type debugSpanProcessor struct{ logger *zap.Logger } + +var _ SpanProcessor = (*debugSpanProcessor)(nil) + +func (sp *debugSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + if batch.Node == nil { + sp.logger.Warn("Received batch with nil Node", zap.String("format", spanFormat)) + } + + sp.logger.Debug("debugSpanProcessor", zap.String("originalFormat", spanFormat), zap.Int("#spans", len(batch.Spans))) + return 0, nil +} + +// NewNoopSpanProcessor creates an OC SpanProcessor that just drops the received data. +func NewNoopSpanProcessor(logger *zap.Logger) SpanProcessor { + return &debugSpanProcessor{logger: logger} +} diff --git a/internal/collector/processor/processor_to_sink.go b/internal/collector/processor/processor_to_sink.go new file mode 100644 index 00000000..e31e1128 --- /dev/null +++ b/internal/collector/processor/processor_to_sink.go @@ -0,0 +1,55 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "context" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/spansink" +) + +type protoProcessorSink struct { + sourceFormat string + protoProcessor SpanProcessor +} + +var _ (spansink.Sink) = (*protoProcessorSink)(nil) + +// WrapWithSpanSink wraps a processor to be used as a span sink by receivers. +func WrapWithSpanSink(format string, p SpanProcessor) spansink.Sink { + return &protoProcessorSink{ + sourceFormat: format, + protoProcessor: p, + } +} + +func (ps *protoProcessorSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { + batch := &agenttracepb.ExportTraceServiceRequest{ + Node: node, + Spans: spans, + } + + failures, err := ps.protoProcessor.ProcessSpans(batch, ps.sourceFormat) + + ack := &spansink.Acknowledgement{ + SavedSpans: uint64(len(batch.Spans)) - failures, + DroppedSpans: failures, + } + + return ack, err +} diff --git a/internal/collector/processor/queued_processor.go b/internal/collector/processor/queued_processor.go new file mode 100644 index 00000000..98563e5b --- /dev/null +++ b/internal/collector/processor/queued_processor.go @@ -0,0 +1,144 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "sync" + "time" + + "github.com/jaegertracing/jaeger/pkg/queue" + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +type queuedSpanProcessor struct { + queue *queue.BoundedQueue + logger *zap.Logger + sender SpanProcessor + numWorkers int + retryOnProcessingFailure bool + backoffDelay time.Duration + stopCh chan struct{} + stopOnce sync.Once +} + +var _ SpanProcessor = (*queuedSpanProcessor)(nil) + +type queueItem struct { + queuedTime time.Time + batch *agenttracepb.ExportTraceServiceRequest + spanFormat string +} + +// NewQueuedSpanProcessor returns a span processor that maintains a bounded +// in-memory queue of span batches, and sends out span batches using the +// provided sender +func NewQueuedSpanProcessor(sender SpanProcessor, opts ...Option) SpanProcessor { + sp := newQueuedSpanProcessor(sender, opts...) + + sp.queue.StartConsumers(sp.numWorkers, func(item interface{}) { + value := item.(*queueItem) + sp.processItemFromQueue(value) + }) + + return sp +} + +func newQueuedSpanProcessor(sender SpanProcessor, opts ...Option) *queuedSpanProcessor { + options := Options.apply(opts...) + boundedQueue := queue.NewBoundedQueue(options.queueSize, func(item interface{}) {}) + return &queuedSpanProcessor{ + queue: boundedQueue, + logger: options.logger, + numWorkers: options.numWorkers, + sender: sender, + retryOnProcessingFailure: options.retryOnProcessingFailure, + backoffDelay: options.backoffDelay, + stopCh: make(chan struct{}), + } +} + +// Stop halts the span processor and all its goroutines. +func (sp *queuedSpanProcessor) Stop() { + sp.stopOnce.Do(func() { + close(sp.stopCh) + sp.queue.Stop() + }) +} + +// ProcessSpans implements the SpanProcessor interface +func (sp *queuedSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (failures uint64, err error) { + allAdded := sp.enqueueSpanBatch(batch, spanFormat) + if !allAdded { + failures = uint64(len(batch.Spans)) + } + return +} + +func (sp *queuedSpanProcessor) enqueueSpanBatch(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) bool { + item := &queueItem{ + queuedTime: time.Now(), + batch: batch, + spanFormat: spanFormat, + } + addedToQueue := sp.queue.Produce(item) + if !addedToQueue { + sp.onItemDropped(item) + } + return addedToQueue +} + +func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { + // TODO: @(pjanotti) metrics: startTime := time.Now() + // TODO: + _, err := sp.sender.ProcessSpans(item.batch, item.spanFormat) + if err != nil { + batchSize := len(item.batch.Spans) + if !sp.retryOnProcessingFailure { + // throw away the batch + sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", batchSize)) + sp.onItemDropped(item) + } else { + // TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly. + // This will have the benefit of keeping the batch closer to related ones in time. + if !sp.queue.Produce(item) { + sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", batchSize)) + sp.onItemDropped(item) + } else { + sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", batchSize)) + } + } + // back-off for configured delay, but get interrupted when shutting down + if sp.backoffDelay > 0 { + sp.logger.Warn("Backing off before next attempt", + zap.Duration("backoff-delay", sp.backoffDelay)) + select { + case <-sp.stopCh: + sp.logger.Info("Interrupted due to shutdown") + break + case <-time.After(sp.backoffDelay): + sp.logger.Info("Resume processing") + break + } + } + } +} + +func (sp *queuedSpanProcessor) onItemDropped(item *queueItem) { + sp.logger.Warn("Span batch dropped", + zap.Int("#spans", len(item.batch.Spans)), + zap.String("spanSource", item.spanFormat)) +} diff --git a/internal/collector/processor/queued_processor_test.go b/internal/collector/processor/queued_processor_test.go new file mode 100644 index 00000000..1443a8e4 --- /dev/null +++ b/internal/collector/processor/queued_processor_test.go @@ -0,0 +1,82 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "sync" + "sync/atomic" + "testing" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +func TestQueueProcessorHappyPath(t *testing.T) { + mockProc := newMockConcurrentSpanProcessor() + qp := NewQueuedSpanProcessor(mockProc) + goFn := func(b *agenttracepb.ExportTraceServiceRequest) { + qp.ProcessSpans(b, "test") + } + + spans := []*tracepb.Span{{}} + wantBatches := 10 + wantSpans := 0 + for i := 0; i < wantBatches; i++ { + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: spans, + } + wantSpans += len(spans) + spans = append(spans, &tracepb.Span{}) + fn := func() { goFn(batch) } + mockProc.runConcurrently(fn) + } + + // Wait until all batches received + mockProc.awaitAsyncProcessing() + + if wantBatches != int(mockProc.batchCount) { + t.Fatalf("Wanted %d batches, got %d", wantBatches, mockProc.batchCount) + } + if wantSpans != int(mockProc.spanCount) { + t.Fatalf("Wanted %d spans, got %d", wantSpans, mockProc.spanCount) + } +} + +type mockConcurrentSpanProcessor struct { + waitGroup *sync.WaitGroup + batchCount int32 + spanCount int32 +} + +var _ SpanProcessor = (*mockConcurrentSpanProcessor)(nil) + +func (p *mockConcurrentSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + atomic.AddInt32(&p.batchCount, 1) + atomic.AddInt32(&p.spanCount, int32(len(batch.Spans))) + p.waitGroup.Done() + return 0, nil +} + +func newMockConcurrentSpanProcessor() *mockConcurrentSpanProcessor { + return &mockConcurrentSpanProcessor{waitGroup: new(sync.WaitGroup)} +} +func (p *mockConcurrentSpanProcessor) runConcurrently(fn func()) { + p.waitGroup.Add(1) + go fn() +} + +func (p *mockConcurrentSpanProcessor) awaitAsyncProcessing() { + p.waitGroup.Wait() +} diff --git a/internal/collector/zipkin/receiver.go b/internal/collector/zipkin/receiver.go new file mode 100644 index 00000000..69a223db --- /dev/null +++ b/internal/collector/zipkin/receiver.go @@ -0,0 +1,66 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package zipkinreceiver wraps the functionality to start the end-point that +// receives Zipkin traces. +package zipkinreceiver + +import ( + "fmt" + "net" + "net/http" + "strconv" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + zr "github.com/census-instrumentation/opencensus-service/receiver/zipkin" +) + +// Run starts the Zipkin receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewDefaultZipkinReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + // TODO: (@pjanotti) when Zipkin implementation of StartTraceReceiver is working, change this code (this temporarily). + ss := processor.WrapWithSpanSink("zipkin", spanProc) + addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10) + zi, err := zr.New(ss) + if err != nil { + return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err) + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("Cannot bind Zipkin receiver to address %q: %v", addr, err) + } + mux := http.NewServeMux() + mux.Handle("/api/v2/spans", zi) + go func() { + if err := http.Serve(ln, mux); err != nil { + logger.Fatal("Failed to serve the Zipkin receiver: %v", zap.Error(err)) + } + }() + + logger.Info("Zipkin receiver is running.", zap.Int("port", rOpts.Port)) + + doneFn := func() { + ln.Close() + } + return doneFn, nil +}