From 4f3bb2c88d38b1af99555c5413b9d0d235efe062 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 16 Nov 2018 16:13:24 -0800 Subject: [PATCH] Add processor queue (#197) * Start adding pipeline to collector This is the basis to do the work for actually implementing OC collector in the short run. At first enable receivers: 1. OC Receiver 2. Jaeger 3. Zipkin The queue processor is added but not connected to exporters by default, although there are options to do so. As previously indicated before there is some duplication of code with the agent, this should be reduced as we implement the interfaces and project coalesce around certain patterns. The queue is missing its metrics and those will be added soon. --- CONTRIBUTING.md | 10 + README.md | 24 +- cmd/occollector/app/builder/builder.go | 138 +++++++++ cmd/occollector/app/builder/builder_test.go | 80 ++++++ .../builder/testdata/receivers_disabled.yaml | 5 + .../builder/testdata/receivers_enabled.yaml | 5 + cmd/occollector/app/collector/collector.go | 268 ++++++++++++++++++ cmd/occollector/main.go | 79 +----- go.mod | 7 + go.sum | 40 +++ internal/collector/jaeger/receiver.go | 63 ++++ internal/collector/opencensus/receiver.go | 71 +++++ internal/collector/processor/doc.go | 19 ++ .../collector/processor/exporter_processor.go | 41 +++ .../collector/processor/multi_processor.go | 66 +++++ .../processor/multi_processor_test.go | 144 ++++++++++ internal/collector/processor/options.go | 110 +++++++ internal/collector/processor/processor.go | 47 +++ .../collector/processor/processor_to_sink.go | 55 ++++ .../collector/processor/queued_processor.go | 144 ++++++++++ .../processor/queued_processor_test.go | 82 ++++++ internal/collector/zipkin/receiver.go | 66 +++++ 22 files changed, 1485 insertions(+), 79 deletions(-) create mode 100644 cmd/occollector/app/builder/builder.go create mode 100644 cmd/occollector/app/builder/builder_test.go create mode 100644 cmd/occollector/app/builder/testdata/receivers_disabled.yaml create mode 100644 cmd/occollector/app/builder/testdata/receivers_enabled.yaml create mode 100644 cmd/occollector/app/collector/collector.go create mode 100644 internal/collector/jaeger/receiver.go create mode 100644 internal/collector/opencensus/receiver.go create mode 100644 internal/collector/processor/doc.go create mode 100644 internal/collector/processor/exporter_processor.go create mode 100644 internal/collector/processor/multi_processor.go create mode 100644 internal/collector/processor/multi_processor_test.go create mode 100644 internal/collector/processor/options.go create mode 100644 internal/collector/processor/processor.go create mode 100644 internal/collector/processor/processor_to_sink.go create mode 100644 internal/collector/processor/queued_processor.go create mode 100644 internal/collector/processor/queued_processor_test.go create mode 100644 internal/collector/zipkin/receiver.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8b41cba0..0bf13ced 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,3 +53,13 @@ $ git checkout -b feature $ git commit $ git push fork feature ``` + +## General Notes + +This project uses go 1.11 and Travis for CI. + +Travis CI uses the Makefile with the default target, it is recommended to +run it before submitting your PR. It runs `gofmt -s` (simplify) and `golint`. + +The dependencies are managed with `go mod` if you work with the sources under your +`$GOPATH` you need to set the environment variable `GO111MODULE=on`. \ No newline at end of file diff --git a/README.md b/README.md index c4a1389a..c1302a17 100644 --- a/README.md +++ b/README.md @@ -292,20 +292,38 @@ The collector is in its initial development stages. It can be run directly from sources, binary, or a Docker image. 1. Run from sources: -``` +```shell $ go run github.com/census-instrumentation/opencensus-service/cmd/occollector ``` 2. Run from binary (from the root of your repo): -``` +```shell $ make collector $ ./bin/occollector_$($GOOS) ``` 3. Build a Docker scratch image and use the appropria Docker command for your scenario: -``` +```shell $ make docker-collector $ docker run --rm -it -p 55678:55678 occollector ``` +4. It can be configured via command-line or config file: +```shell +OpenCensus Collector + +Usage: + occollector [flags] + +Flags: + --add-queued-processor Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper) + --config string Path to the config file + -h, --help help for occollector + --log-level string Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL) (default "INFO") + --noop-processor Flag to add the no-op processor (combine with log level DEBUG to log incoming spans) + --receive-jaeger Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: {ThriftTChannelPort:14267 ThriftHTTPPort:14268} + --receive-oc-trace Flag to run the OpenCensus trace receiver, default settings: {Port:55678} + --receive-zipkin Flag to run the Zipkin receiver, default settings: {Port:9411} +``` + [travis-image]: https://travis-ci.org/census-instrumentation/opencensus-service.svg?branch=master [travis-url]: https://travis-ci.org/census-instrumentation/opencensus-service [godoc-image]: https://godoc.org/github.com/census-instrumentation/opencensus-service?status.svg diff --git a/cmd/occollector/app/builder/builder.go b/cmd/occollector/app/builder/builder.go new file mode 100644 index 00000000..de8e753c --- /dev/null +++ b/cmd/occollector/app/builder/builder.go @@ -0,0 +1,138 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package builder handles the options to build the OpenCensus collector +// pipeline. +package builder + +import ( + "fmt" + "strings" + + "github.com/spf13/viper" +) + +const ( + receiversRoot = "receivers" + jaegerEntry = "jaeger" + opencensusEntry = "opencensus" + zipkinEntry = "zipkin" +) + +// JaegerReceiverCfg holds configuration for Jaeger receivers. +type JaegerReceiverCfg struct { + // ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests + ThriftTChannelPort int `mapstructure:"thrift-tchannel-port"` + // ThriftHTTPPort is the port that the relay receives on for jaeger thrift http requests + ThriftHTTPPort int `mapstructure:"thrift-http-port"` +} + +// JaegerReceiverEnabled checks if the Jaeger receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func JaegerReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, jaegerEntry) +} + +// NewDefaultJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values +func NewDefaultJaegerReceiverCfg() *JaegerReceiverCfg { + opts := &JaegerReceiverCfg{ + ThriftTChannelPort: 14267, + ThriftHTTPPort: 14268, + } + return opts +} + +// InitFromViper returns a JaegerReceiverCfg according to the configuration. +func (cfg *JaegerReceiverCfg) InitFromViper(v *viper.Viper) (*JaegerReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, jaegerEntry) +} + +// OpenCensusReceiverCfg holds configuration for OpenCensus receiver. +type OpenCensusReceiverCfg struct { + // Port is the port that the receiver will use + Port int `mapstructure:"port"` +} + +// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func OpenCensusReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, opencensusEntry) +} + +// NewDefaultOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values +func NewDefaultOpenCensusReceiverCfg() *OpenCensusReceiverCfg { + opts := &OpenCensusReceiverCfg{ + Port: 55678, + } + return opts +} + +// InitFromViper returns a OpenCensusReceiverCfg according to the configuration. +func (cfg *OpenCensusReceiverCfg) InitFromViper(v *viper.Viper) (*OpenCensusReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, opencensusEntry) +} + +// ZipkinReceiverCfg holds configuration for Zipkin receiver. +type ZipkinReceiverCfg struct { + // Port is the port that the receiver will use + Port int `mapstructure:"port"` +} + +// ZipkinReceiverEnabled checks if the Zipkin receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func ZipkinReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, zipkinEntry) +} + +// NewDefaultZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values +func NewDefaultZipkinReceiverCfg() *ZipkinReceiverCfg { + opts := &ZipkinReceiverCfg{ + Port: 9411, + } + return opts +} + +// InitFromViper returns a ZipkinReceiverCfg according to the configuration. +func (cfg *ZipkinReceiverCfg) InitFromViper(v *viper.Viper) (*ZipkinReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, zipkinEntry) +} + +// Helper functions + +func initFromViper(cfg interface{}, v *viper.Viper, labels ...string) error { + v = getViperSub(v, labels...) + if v == nil { + return nil + } + if err := v.Unmarshal(cfg); err != nil { + return fmt.Errorf("Failed to read configuration for %s %v", strings.Join(labels, ": "), err) + } + + return nil +} + +func getViperSub(v *viper.Viper, labels ...string) *viper.Viper { + for _, label := range labels { + v = v.Sub(label) + if v == nil { + return nil + } + } + + return v +} + +func featureEnabled(v *viper.Viper, cmdFlag string, labels ...string) bool { + return v.GetBool(cmdFlag) || (getViperSub(v, labels...) != nil) +} diff --git a/cmd/occollector/app/builder/builder_test.go b/cmd/occollector/app/builder/builder_test.go new file mode 100644 index 00000000..4f19c1eb --- /dev/null +++ b/cmd/occollector/app/builder/builder_test.go @@ -0,0 +1,80 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "reflect" + "testing" + + "github.com/spf13/viper" +) + +func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { + v, err := loadViperFromFile("./testdata/receivers_enabled.yaml") + if err != nil { + t.Fatalf("Failed to load viper from test file: %v", err) + } + + jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if !jaegerEnabled || !opencensusEnabled || !zipkinEnabled { + t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled) + } + + wj := NewDefaultJaegerReceiverCfg() + gj, err := wj.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err) + } else if !reflect.DeepEqual(wj, gj) { + t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj) + } + + woc := NewDefaultOpenCensusReceiverCfg() + goc, err := woc.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err) + } else if !reflect.DeepEqual(woc, goc) { + t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc) + } + + wz := NewDefaultZipkinReceiverCfg() + gz, err := wz.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err) + } else if !reflect.DeepEqual(wz, gz) { + t.Errorf("Incorrect config for Zipkin receiver, want %v got %v", wz, gz) + } +} + +func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) { + v, err := loadViperFromFile("./testdata/receivers_disabled.yaml") + if err != nil { + t.Fatalf("Failed to load viper from test file: %v", err) + } + + jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if jaegerEnabled || opencensusEnabled || zipkinEnabled { + t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled) + } +} + +func loadViperFromFile(file string) (*viper.Viper, error) { + v := viper.New() + v.SetConfigFile(file) + err := v.ReadInConfig() + if err != nil { + return nil, err + } + return v, nil +} diff --git a/cmd/occollector/app/builder/testdata/receivers_disabled.yaml b/cmd/occollector/app/builder/testdata/receivers_disabled.yaml new file mode 100644 index 00000000..249d7d75 --- /dev/null +++ b/cmd/occollector/app/builder/testdata/receivers_disabled.yaml @@ -0,0 +1,5 @@ +# No receivers enabled, all commented out +receivers: + # jaeger: {} + # opencensus: {} + # zipkin: {} diff --git a/cmd/occollector/app/builder/testdata/receivers_enabled.yaml b/cmd/occollector/app/builder/testdata/receivers_enabled.yaml new file mode 100644 index 00000000..1287369a --- /dev/null +++ b/cmd/occollector/app/builder/testdata/receivers_enabled.yaml @@ -0,0 +1,5 @@ +# Enable receivers with default configuration +receivers: + jaeger: {} + opencensus: {} + zipkin: {} diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go new file mode 100644 index 00000000..dc3cae6b --- /dev/null +++ b/cmd/occollector/app/collector/collector.go @@ -0,0 +1,268 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector handles the command-line, configuration, and runs the OC collector. +package collector + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/exporter" + "github.com/census-instrumentation/opencensus-service/exporter/exporterparser" + "github.com/census-instrumentation/opencensus-service/internal/collector/jaeger" + "github.com/census-instrumentation/opencensus-service/internal/collector/opencensus" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/internal/collector/zipkin" +) + +const ( + configCfg = "config" + logLevelCfg = "log-level" + jaegerReceiverFlg = "receive-jaeger" + ocReceiverFlg = "receive-oc-trace" + zipkinReceiverFlg = "receive-zipkin" + debugProcessorFlg = "debug-processor" + queuedProcessorFlg = "add-queued-processor" // TODO: (@pjanotti) this is temporary flag until it can be read from config. +) + +var ( + config string + + v = viper.New() + + logger *zap.Logger + + rootCmd = &cobra.Command{ + Use: "occollector", + Long: "OpenCensus Collector", + Run: func(cmd *cobra.Command, args []string) { + if file := v.GetString(configCfg); file != "" { + v.SetConfigFile(file) + err := v.ReadInConfig() + if err != nil { + log.Fatalf("Error loading config file %q: %v", file, err) + return + } + } + + execute() + }, + } +) + +func init() { + rootCmd.PersistentFlags().String(logLevelCfg, "INFO", "Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL)") + v.BindPFlag(logLevelCfg, rootCmd.PersistentFlags().Lookup(logLevelCfg)) + + // local flags + rootCmd.Flags().StringVar(&config, configCfg, "", "Path to the config file") + rootCmd.Flags().Bool(jaegerReceiverFlg, false, + fmt.Sprintf("Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: %+v", *builder.NewDefaultJaegerReceiverCfg())) + rootCmd.Flags().Bool(ocReceiverFlg, false, + fmt.Sprintf("Flag to run the OpenCensus trace receiver, default settings: %+v", *builder.NewDefaultOpenCensusReceiverCfg())) + rootCmd.Flags().Bool(zipkinReceiverFlg, false, + fmt.Sprintf("Flag to run the Zipkin receiver, default settings: %+v", *builder.NewDefaultZipkinReceiverCfg())) + rootCmd.Flags().Bool(debugProcessorFlg, false, "Flag to add a debug processor (combine with log level DEBUG to log incoming spans)") + rootCmd.Flags().Bool(queuedProcessorFlg, false, "Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper)") + + // TODO: (@pjanotti) add builder options as flags, before calls bellow. Likely it will require code re-org. + + v.AutomaticEnv() + v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) + v.BindPFlags(rootCmd.Flags()) +} + +func newLogger() (*zap.Logger, error) { + var level zapcore.Level + err := (&level).UnmarshalText([]byte(v.GetString(logLevelCfg))) + if err != nil { + return nil, err + } + conf := zap.NewProductionConfig() + conf.Level.SetLevel(level) + return conf.Build() +} + +func execute() { + var signalsChannel = make(chan os.Signal) + signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + + var err error + logger, err = newLogger() + if err != nil { + log.Fatalf("Failed to get logger: %v", err) + } + + logger.Info("Starting...") + + // Build pipeline from its end: 1st exporters, the OC-proto queue processor, and + // finally the receivers. + + var closeFns []func() + var spanProcessors []processor.SpanProcessor + exportersCloseFns, exporters := createExporters() + closeFns = append(closeFns, exportersCloseFns...) + if len(exporters) > 0 { + // Exporters need an extra hop from OC-proto to span data: to workaround that for now + // we will use a special processor that transforms the data to a format that they can consume. + // TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use + // the exporters: this can lose node information and it is not ideal for performance and delegates + // the retry/buffering to the exporters (that are designed to run within the tracing process). + spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(exporters...)) + } + + if v.GetBool(debugProcessorFlg) { + spanProcessors = append(spanProcessors, processor.NewNoopSpanProcessor(logger)) + } + + if len(spanProcessors) == 0 { + logger.Warn("Nothing to do: no processor was enabled. Shutting down.") + os.Exit(1) + } + + // TODO: (@pjanotti) construct queued processor from config options, for now just to exercise it, wrap one around + // the first span processor available. + if v.GetBool(queuedProcessorFlg) { + spanProcessors[0] = processor.NewQueuedSpanProcessor(spanProcessors[0]) + } + + // Wraps processors in a single one to be connected to all enabled receivers. + spanProcessor := processor.NewMultiSpanProcessor(spanProcessors...) + + receiversCloseFns := createReceivers(spanProcessor) + closeFns = append(closeFns, receiversCloseFns...) + + logger.Info("Collector is up and running.") + + <-signalsChannel + logger.Info("Starting shutdown...") + + // TODO: orderly shutdown: first receivers, then flushing pipelines giving + // senders a chance to send all their data. This may take time, the allowed + // time should be part of configuration. + for i := len(closeFns) - 1; i > 0; i-- { + closeFns[i]() + } + + logger.Info("Shutdown complete.") +} + +func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter) { + // TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility. + parseFns := []struct { + name string + fn func([]byte) ([]exporter.TraceExporter, []func() error, error) + }{ + {name: "datadog", fn: exporterparser.DatadogTraceExportersFromYAML}, + {name: "stackdriver", fn: exporterparser.StackdriverTraceExportersFromYAML}, + {name: "zipkin", fn: exporterparser.ZipkinExportersFromYAML}, + {name: "jaeger", fn: exporterparser.JaegerExportersFromYAML}, + {name: "kafka", fn: exporterparser.KafkaExportersFromYAML}, + } + + if config == "" { + logger.Info("No config file, exporters can be only configured via the file.") + return + } + + cfgBlob, err := ioutil.ReadFile(config) + if err != nil { + logger.Fatal("Cannot read config file for exporters", zap.Error(err)) + } + + for _, cfg := range parseFns { + tes, tesDoneFns, err := cfg.fn(cfgBlob) + if err != nil { + logger.Fatal("Failed to create config for exporter", zap.String("exporter", cfg.name), zap.Error(err)) + } + + wasEnabled := false + for _, te := range tes { + if te != nil { + wasEnabled = true + traceExporters = append(traceExporters, te) + } + } + + for _, tesDoneFn := range tesDoneFns { + if tesDoneFn != nil { + wrapperFn := func() { + if err := tesDoneFn(); err != nil { + logger.Warn("Error when closing exporter", zap.String("exporter", cfg.name), zap.Error(err)) + } + } + doneFns = append(doneFns, wrapperFn) + } + } + + if wasEnabled { + logger.Info("Exporter enabled", zap.String("exporter", cfg.name)) + } + } + + return doneFns, traceExporters +} + +func createReceivers(spanProcessor processor.SpanProcessor) (closeFns []func()) { + var someReceiverEnabled bool + receivers := []struct { + name string + runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (func(), error) + enabled bool + }{ + {"Jaeger", jaegerreceiver.Run, builder.JaegerReceiverEnabled(v, jaegerReceiverFlg)}, + {"OpenCensus", ocreceiver.Run, builder.OpenCensusReceiverEnabled(v, ocReceiverFlg)}, + {"Zipkin", zipkinreceiver.Run, builder.ZipkinReceiverEnabled(v, zipkinReceiverFlg)}, + } + + for _, receiver := range receivers { + if receiver.enabled { + closeSrv, err := receiver.runFn(logger, v, spanProcessor) + if err != nil { + // TODO: (@pjanotti) better shutdown, for now just try to stop any started receiver before terminating. + for _, closeFn := range closeFns { + closeFn() + } + logger.Fatal("Cannot run receiver for "+receiver.name, zap.Error(err)) + } + closeFns = append(closeFns, closeSrv) + someReceiverEnabled = true + } + } + + if !someReceiverEnabled { + logger.Warn("Nothing to do: no receiver was enabled. Shutting down.") + os.Exit(1) + } + + return closeFns +} + +// Execute the application according to the command and configuration given +// by the user. +func Execute() error { + return rootCmd.Execute() +} diff --git a/cmd/occollector/main.go b/cmd/occollector/main.go index 1e8a12f9..8b12f534 100644 --- a/cmd/occollector/main.go +++ b/cmd/occollector/main.go @@ -20,86 +20,13 @@ package main import ( - "context" - "fmt" "log" - "os" - "os/signal" - "syscall" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/census-instrumentation/opencensus-service/receiver/opencensus" - "github.com/census-instrumentation/opencensus-service/spansink" - - "go.opencensus.io/plugin/ocgrpc" - "go.opencensus.io/stats/view" - "go.uber.org/zap" -) - -const ( - defaultOCAddress = ":55678" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/collector" ) func main() { - var signalsChannel = make(chan os.Signal) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) - - // TODO: Allow configuration of logger and other items such as servers, etc. - logger, err := zap.NewProduction() - if err != nil { - log.Fatalf("Failed to get production logger: %v", err) - } - - logger.Info("Starting...") - - closeSrv, err := runOCServerWithReceiver(defaultOCAddress, logger) - if err != nil { - logger.Fatal("Cannot run opencensus server", zap.String("Address", defaultOCAddress), zap.Error(err)) - } - - logger.Info("Collector is up and running.") - - <-signalsChannel - logger.Info("Starting shutdown...") - - // TODO: orderly shutdown: first receivers, then flushing pipelines giving - // senders a chance to send all their data. This may take time, the allowed - // time should be part of configuration. - closeSrv() - - logger.Info("Shutdown complete.") -} - -func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, error) { - if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) - } - - sr := &fakeSpanSink{ - logger: logger, + if err := collector.Execute(); err != nil { + log.Fatalf("Failed to run the collector: %v", err) } - - ocr, err := opencensus.New(addr) - if err != nil { - return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) - } - if err := ocr.StartTraceReception(context.Background(), sr); err != nil { - return nil, fmt.Errorf("Failed to start OpenCensus TraceReceiver : %v", err) - } - return ocr.Stop, nil -} - -type fakeSpanSink struct { - logger *zap.Logger -} - -func (sr *fakeSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { - ack := &spansink.Acknowledgement{ - SavedSpans: uint64(len(spans)), - } - - sr.logger.Info("ReceivedSpans", zap.Uint64("Received spans", ack.SavedSpans)) - - return ack, nil } diff --git a/go.mod b/go.mod index 0ca70cfe..2146fae5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ require ( contrib.go.opencensus.io/exporter/ocagent v0.3.0 contrib.go.opencensus.io/exporter/stackdriver v0.7.0 git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 // indirect + github.com/BurntSushi/toml v0.3.1 // indirect github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 // indirect github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20181026070331-e7c4bd17b329 github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 // indirect github.com/aws/aws-sdk-go v1.15.68 // indirect + github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/census-instrumentation/opencensus-proto v0.1.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/gogo/googleapis v1.1.0 // indirect @@ -20,13 +22,18 @@ require ( github.com/jaegertracing/jaeger v1.7.0 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/mitchellh/mapstructure v1.1.2 // indirect github.com/opentracing/opentracing-go v1.0.2 // indirect github.com/openzipkin/zipkin-go v0.1.3 github.com/philhofer/fwd v1.0.0 // indirect github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect + github.com/spf13/viper v1.2.1 + github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 // indirect + github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.2.2 // indirect github.com/tinylib/msgp v1.0.2 // indirect github.com/uber-go/atomic v1.3.2 // indirect diff --git a/go.sum b/go.sum index ddd5c0eb..dde3dbdd 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999 h1:sihTnRgTOUSCQz0i git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 h1:km2LQYVCbsEbT7HCJ/nrMopI5CvJUkniCcSf9ZP5+iQ= git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 h1:dmc/C8bpE5VkQn65PNbbyACDC8xw8Hpp/NEurdPmQDQ= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20181026070331-e7c4bd17b329 h1:WOxkY7ClXANNyQQuq4rxQrM/nQnjXCpvqY0ipYxB9cQ= @@ -21,11 +23,16 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M= github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:CZI8h5fmYwCCvd2RMSsjLqHN6OqABlWJweFKxz4vdEs= +github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:CZI8h5fmYwCCvd2RMSsjLqHN6OqABlWJweFKxz4vdEs= +github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/aws/aws-sdk-go v1.15.31/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.15.68 h1:2+CJhKkxU/ldztVaJ7V5adR1/ZnYl+oc7ufq2Bl18BQ= github.com/aws/aws-sdk-go v1.15.68/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= +github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/census-instrumentation/opencensus-proto v0.0.2-0.20180913191712-f303ae3f8d6a/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.0.2/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.1.0 h1:VwZ9smxzX8u14/125wHIX7ARV+YhR+L4JADswwxWK0Y= @@ -41,6 +48,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -67,6 +76,8 @@ github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/grpc-ecosystem/grpc-gateway v1.5.0 h1:WcmKMm43DR7RdtlkEXQJyo5ws8iTp98CyhCCbOHMvNI= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jaegertracing/jaeger v1.7.0 h1:RVpmOTj7Zb9QRFHI5CYvkYSTR8Cdn/PhM5kBxA4pZBM= @@ -81,12 +92,21 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/openzipkin/zipkin-go v0.1.3 h1:36hTtUTQR/vPX7YVJo2PYexSbHdAJiAkDrjuXw/YlYQ= github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v0.0.0-20181005164709-635575b42742 h1:wKfigKMTgvSzBLIVvB5QaBBQI0odU6n45/UKSphjLus= @@ -96,16 +116,35 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a h1:AA9vgIBDjMHPC2McaGPojgV2dcI78ZC0TLNhYCXEKH8= +github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a/go.mod h1:lzZQ3Noex5pfAy7mkAeCjcBDteYU85uWWnJ/y6gKU8k= +github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e h1:n/3MEhJQjQxrOUCzh1Y3Re6aJUUWRp2M9+Oc3eVn/54= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 h1:agujYaXJSxSo18YNX3jzl+4G6Bstwt+kqv47GS12uL0= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= +github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= +github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6VSaahyKadf4WtSsJIgne6A1WLOAGM8A= +github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= @@ -146,6 +185,7 @@ golang.org/x/oauth2 v0.0.0-20181102170140-232e45548389/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181031143558-9b800f95dbbc h1:SdCq5U4J+PpbSDIl9bM0V1e1Ug1jsnBkAFvTs1htn7U= diff --git a/internal/collector/jaeger/receiver.go b/internal/collector/jaeger/receiver.go new file mode 100644 index 00000000..f91bb30d --- /dev/null +++ b/internal/collector/jaeger/receiver.go @@ -0,0 +1,63 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jaegerreceiver wraps the functionality to start the end-point that +// receives Jaeger data sent by the jaeger-agent in jaeger.thrift format over +// TChannel and directly from clients in jaeger.thrift format over binary thrift +// protocol (HTTP transport). +// Note that the UDP transport is not supported since these protocol/transport +// are for task->jaeger-agent communication only and the receiver does not try to +// support jaeger-agent endpoints. +// TODO: add support for the jaeger proto endpoint released in jaeger 1.8package jaegerreceiver +package jaegerreceiver + +import ( + "context" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/jaeger" +) + +// Run starts the Jaeger receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewDefaultJaegerReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + ctx := context.Background() + jtr, err := jaeger.New(ctx, rOpts.ThriftTChannelPort, rOpts.ThriftHTTPPort) + if err != nil { + return nil, err + } + + ss := processor.WrapWithSpanSink("jaeger", spanProc) + if err := jtr.StartTraceReception(ctx, ss); err != nil { + return nil, err + } + + logger.Info("Jaeger receiver is running.", + zap.Int("thrift-tchannel-port", rOpts.ThriftTChannelPort), + zap.Int("thrift-http-port", rOpts.ThriftHTTPPort)) + + closeFn := func() { + jtr.StopTraceReception(context.Background()) + } + + return closeFn, nil +} diff --git a/internal/collector/opencensus/receiver.go b/internal/collector/opencensus/receiver.go new file mode 100644 index 00000000..5571b9da --- /dev/null +++ b/internal/collector/opencensus/receiver.go @@ -0,0 +1,71 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ocreceiver wraps the functionality to start the end-point that +// receives data directly in the OpenCensus format. +package ocreceiver + +import ( + "fmt" + "net" + "strconv" + "time" + + "github.com/spf13/viper" + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/stats/view" + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace" +) + +// Run starts the OpenCensus receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + grpcSrv := internal.GRPCServerWithObservabilityEnabled() + + if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { + return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) + } + + lis, err := net.Listen("tcp", ":"+strconv.FormatInt(int64(rOpts.Port), 10)) + if err != nil { + return nil, fmt.Errorf("Cannot bind tcp listener to address: %v", err) + } + + ss := processor.WrapWithSpanSink("oc", spanProc) + oci, err := octrace.New(ss, octrace.WithSpanBufferPeriod(1*time.Second)) + if err != nil { + return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) + } + + agenttracepb.RegisterTraceServiceServer(grpcSrv, oci) + go func() { + if err := grpcSrv.Serve(lis); err != nil { + logger.Error("OpenCensus gRPC shutdown", zap.Error(err)) + } + }() + + logger.Info("OpenCensus receiver is running.", zap.Int("port", rOpts.Port)) + + return grpcSrv.Stop, nil +} diff --git a/internal/collector/processor/doc.go b/internal/collector/processor/doc.go new file mode 100644 index 00000000..6f24e7e0 --- /dev/null +++ b/internal/collector/processor/doc.go @@ -0,0 +1,19 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package processor is the central point on the collector processing: it +// aggregates and performs any operation that applies to all traces in the +// pipeline. Traces reach it after being converted to the OpenCensus protobuf +// format. +package processor diff --git a/internal/collector/processor/exporter_processor.go b/internal/collector/processor/exporter_processor.go new file mode 100644 index 00000000..d54b5ce8 --- /dev/null +++ b/internal/collector/processor/exporter_processor.go @@ -0,0 +1,41 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "context" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/exporter" +) + +type exporterSpanProcessor struct { + tes exporter.TraceExporterSink +} + +var _ SpanProcessor = (*exporterSpanProcessor)(nil) + +// NewTraceExporterProcessor creates processor that feeds SpanData to the given trace exporters. +func NewTraceExporterProcessor(traceExporters ...exporter.TraceExporter) SpanProcessor { + return &exporterSpanProcessor{tes: exporter.MultiTraceExporters(traceExporters...)} +} + +func (sp *exporterSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + ack, err := sp.tes.ReceiveSpans(context.Background(), batch.Node, batch.Spans...) + if err != nil { + return ack.DroppedSpans, err + } + return 0, nil +} diff --git a/internal/collector/processor/multi_processor.go b/internal/collector/processor/multi_processor.go new file mode 100644 index 00000000..4f81effe --- /dev/null +++ b/internal/collector/processor/multi_processor.go @@ -0,0 +1,66 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "fmt" + "strings" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +// MultiSpanProcessor enables processing on multiple processors. +// For each incoming span batch, it calls ProcessSpans method on each span +// processor one-by-one. It aggregates success/failures/errors from all of +// them and reports the result upstream. +type MultiSpanProcessor []SpanProcessor + +var _ SpanProcessor = (*MultiSpanProcessor)(nil) + +// NewMultiSpanProcessor creates a MultiSpanProcessor from the variadic +// list of passed SpanProcessors. +func NewMultiSpanProcessor(procs ...SpanProcessor) MultiSpanProcessor { + return procs +} + +// ProcessSpans implements the SpanProcessor interface +func (msp MultiSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + var maxFailures uint64 + var errors []error + for _, sp := range msp { + failures, err := sp.ProcessSpans(batch, spanFormat) + if err != nil { + errors = append(errors, err) + } + + if failures > maxFailures { + maxFailures = failures + } + } + + var err error + numErrors := len(errors) + if numErrors == 1 { + err = errors[0] + } else if numErrors > 1 { + errMsgs := make([]string, numErrors) + for _, err := range errors { + errMsgs = append(errMsgs, err.Error()) + } + err = fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) + } + + return maxFailures, err +} diff --git a/internal/collector/processor/multi_processor_test.go b/internal/collector/processor/multi_processor_test.go new file mode 100644 index 00000000..7330fed8 --- /dev/null +++ b/internal/collector/processor/multi_processor_test.go @@ -0,0 +1,144 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "fmt" + "testing" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +func TestMultiSpanProcessorMultiplexing(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, 7), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + wantSpansCount += len(batch.Spans) + tt.ProcessSpans(batch, "test") + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +func TestMultiSpanProcessorSomeNotOk(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + // Make one processor return false for some spans + m := processors[1].(*mockSpanProcessor) + wantFailures := uint64(2) + m.Failures = wantFailures + + tt := NewMultiSpanProcessor(processors...) + spans := make([]*tracepb.Span, wantFailures+3) + for i := range spans { + spans[i] = &tracepb.Span{} + } + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: spans, + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + failures, _ := tt.ProcessSpans(batch, "test") + batchSize := len(batch.Spans) + wantSpansCount += batchSize + if wantFailures != failures { + t.Errorf("Wanted %d failures but got %d", wantFailures, failures) + } + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +func TestMultiSpanProcessorWhenOneErrors(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + // Make one processor return error + m := processors[1].(*mockSpanProcessor) + m.MustFail = true + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, 5), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + failures, err := tt.ProcessSpans(batch, "test") + if err == nil { + t.Errorf("Wanted error got nil") + return + } + batchSize := len(batch.Spans) + wantSpansCount += batchSize + if failures != uint64(batchSize) { + t.Errorf("Wanted all spans to fail, got a different value.") + } + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +type mockSpanProcessor struct { + Failures uint64 + TotalSpans int + MustFail bool +} + +var _ SpanProcessor = &mockSpanProcessor{} + +func (p *mockSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + batchSize := len(batch.Spans) + p.TotalSpans += batchSize + if p.MustFail { + return uint64(batchSize), fmt.Errorf("this processor must fail") + } + + return p.Failures, nil +} diff --git a/internal/collector/processor/options.go b/internal/collector/processor/options.go new file mode 100644 index 00000000..37de9129 --- /dev/null +++ b/internal/collector/processor/options.go @@ -0,0 +1,110 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "time" + + "go.uber.org/zap" +) + +const ( + // DefaultNumWorkers is the default number of workers consuming from the processor queue + DefaultNumWorkers = 10 + // DefaultQueueSize is the default maximum number of span batches allowed in the processor's queue + DefaultQueueSize = 1000 +) + +type options struct { + logger *zap.Logger + name string + numWorkers int + queueSize int + backoffDelay time.Duration + extraFormatTypes []string + retryOnProcessingFailure bool +} + +// Option is a function that sets some option on the component. +type Option func(c *options) + +// Options is a factory for all available Option's +var Options options + +// WithLogger creates a Option that initializes the logger +func (options) WithLogger(logger *zap.Logger) Option { + return func(b *options) { + b.logger = logger + } +} + +// WithName creates an Option that initializes the name of the processor +func (options) WithName(name string) Option { + return func(b *options) { + b.name = name + } +} + +// WithNumWorkers creates an Option that initializes the number of queue consumers AKA workers +func (options) WithNumWorkers(numWorkers int) Option { + return func(b *options) { + b.numWorkers = numWorkers + } +} + +// WithQueueSize creates an Option that initializes the queue size +func (options) WithQueueSize(queueSize int) Option { + return func(b *options) { + b.queueSize = queueSize + } +} + +// WithBackoffDelay creates an Option that initializes the backoff delay +func (options) WithBackoffDelay(backoffDelay time.Duration) Option { + return func(b *options) { + b.backoffDelay = backoffDelay + } +} + +// WithExtraFormatTypes creates an Option that initializes the extra list of format types +func (options) WithExtraFormatTypes(extraFormatTypes []string) Option { + return func(b *options) { + b.extraFormatTypes = extraFormatTypes + } +} + +// WithRetryOnProcessingFailures creates an Option that initializes the retryOnProcessingFailure boolean +func (options) WithRetryOnProcessingFailures(retryOnProcessingFailure bool) Option { + return func(b *options) { + b.retryOnProcessingFailure = retryOnProcessingFailure + } +} + +func (o options) apply(opts ...Option) options { + ret := options{} + for _, opt := range opts { + opt(&ret) + } + if ret.logger == nil { + ret.logger = zap.NewNop() + } + if ret.numWorkers == 0 { + ret.numWorkers = DefaultNumWorkers + } + if ret.queueSize == 0 { + ret.queueSize = DefaultQueueSize + } + return ret +} diff --git a/internal/collector/processor/processor.go b/internal/collector/processor/processor.go new file mode 100644 index 00000000..e91f1ed3 --- /dev/null +++ b/internal/collector/processor/processor.go @@ -0,0 +1,47 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +// SpanProcessor handles batches of spans converted to OpenCensus proto format. +type SpanProcessor interface { + // ProcessSpans processes spans and return with the number of spans that failed and an error. + ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) + // TODO: (@pjanotti) For shutdown improvement, the interface needs a method to attempt that. +} + +// An initial processor that does not sends the data to any destination but helps debugging. +type debugSpanProcessor struct{ logger *zap.Logger } + +var _ SpanProcessor = (*debugSpanProcessor)(nil) + +func (sp *debugSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + if batch.Node == nil { + sp.logger.Warn("Received batch with nil Node", zap.String("format", spanFormat)) + } + + sp.logger.Debug("debugSpanProcessor", zap.String("originalFormat", spanFormat), zap.Int("#spans", len(batch.Spans))) + return 0, nil +} + +// NewNoopSpanProcessor creates an OC SpanProcessor that just drops the received data. +func NewNoopSpanProcessor(logger *zap.Logger) SpanProcessor { + return &debugSpanProcessor{logger: logger} +} diff --git a/internal/collector/processor/processor_to_sink.go b/internal/collector/processor/processor_to_sink.go new file mode 100644 index 00000000..e31e1128 --- /dev/null +++ b/internal/collector/processor/processor_to_sink.go @@ -0,0 +1,55 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "context" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/spansink" +) + +type protoProcessorSink struct { + sourceFormat string + protoProcessor SpanProcessor +} + +var _ (spansink.Sink) = (*protoProcessorSink)(nil) + +// WrapWithSpanSink wraps a processor to be used as a span sink by receivers. +func WrapWithSpanSink(format string, p SpanProcessor) spansink.Sink { + return &protoProcessorSink{ + sourceFormat: format, + protoProcessor: p, + } +} + +func (ps *protoProcessorSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { + batch := &agenttracepb.ExportTraceServiceRequest{ + Node: node, + Spans: spans, + } + + failures, err := ps.protoProcessor.ProcessSpans(batch, ps.sourceFormat) + + ack := &spansink.Acknowledgement{ + SavedSpans: uint64(len(batch.Spans)) - failures, + DroppedSpans: failures, + } + + return ack, err +} diff --git a/internal/collector/processor/queued_processor.go b/internal/collector/processor/queued_processor.go new file mode 100644 index 00000000..98563e5b --- /dev/null +++ b/internal/collector/processor/queued_processor.go @@ -0,0 +1,144 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "sync" + "time" + + "github.com/jaegertracing/jaeger/pkg/queue" + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +type queuedSpanProcessor struct { + queue *queue.BoundedQueue + logger *zap.Logger + sender SpanProcessor + numWorkers int + retryOnProcessingFailure bool + backoffDelay time.Duration + stopCh chan struct{} + stopOnce sync.Once +} + +var _ SpanProcessor = (*queuedSpanProcessor)(nil) + +type queueItem struct { + queuedTime time.Time + batch *agenttracepb.ExportTraceServiceRequest + spanFormat string +} + +// NewQueuedSpanProcessor returns a span processor that maintains a bounded +// in-memory queue of span batches, and sends out span batches using the +// provided sender +func NewQueuedSpanProcessor(sender SpanProcessor, opts ...Option) SpanProcessor { + sp := newQueuedSpanProcessor(sender, opts...) + + sp.queue.StartConsumers(sp.numWorkers, func(item interface{}) { + value := item.(*queueItem) + sp.processItemFromQueue(value) + }) + + return sp +} + +func newQueuedSpanProcessor(sender SpanProcessor, opts ...Option) *queuedSpanProcessor { + options := Options.apply(opts...) + boundedQueue := queue.NewBoundedQueue(options.queueSize, func(item interface{}) {}) + return &queuedSpanProcessor{ + queue: boundedQueue, + logger: options.logger, + numWorkers: options.numWorkers, + sender: sender, + retryOnProcessingFailure: options.retryOnProcessingFailure, + backoffDelay: options.backoffDelay, + stopCh: make(chan struct{}), + } +} + +// Stop halts the span processor and all its goroutines. +func (sp *queuedSpanProcessor) Stop() { + sp.stopOnce.Do(func() { + close(sp.stopCh) + sp.queue.Stop() + }) +} + +// ProcessSpans implements the SpanProcessor interface +func (sp *queuedSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (failures uint64, err error) { + allAdded := sp.enqueueSpanBatch(batch, spanFormat) + if !allAdded { + failures = uint64(len(batch.Spans)) + } + return +} + +func (sp *queuedSpanProcessor) enqueueSpanBatch(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) bool { + item := &queueItem{ + queuedTime: time.Now(), + batch: batch, + spanFormat: spanFormat, + } + addedToQueue := sp.queue.Produce(item) + if !addedToQueue { + sp.onItemDropped(item) + } + return addedToQueue +} + +func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { + // TODO: @(pjanotti) metrics: startTime := time.Now() + // TODO: + _, err := sp.sender.ProcessSpans(item.batch, item.spanFormat) + if err != nil { + batchSize := len(item.batch.Spans) + if !sp.retryOnProcessingFailure { + // throw away the batch + sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", batchSize)) + sp.onItemDropped(item) + } else { + // TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly. + // This will have the benefit of keeping the batch closer to related ones in time. + if !sp.queue.Produce(item) { + sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", batchSize)) + sp.onItemDropped(item) + } else { + sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", batchSize)) + } + } + // back-off for configured delay, but get interrupted when shutting down + if sp.backoffDelay > 0 { + sp.logger.Warn("Backing off before next attempt", + zap.Duration("backoff-delay", sp.backoffDelay)) + select { + case <-sp.stopCh: + sp.logger.Info("Interrupted due to shutdown") + break + case <-time.After(sp.backoffDelay): + sp.logger.Info("Resume processing") + break + } + } + } +} + +func (sp *queuedSpanProcessor) onItemDropped(item *queueItem) { + sp.logger.Warn("Span batch dropped", + zap.Int("#spans", len(item.batch.Spans)), + zap.String("spanSource", item.spanFormat)) +} diff --git a/internal/collector/processor/queued_processor_test.go b/internal/collector/processor/queued_processor_test.go new file mode 100644 index 00000000..1443a8e4 --- /dev/null +++ b/internal/collector/processor/queued_processor_test.go @@ -0,0 +1,82 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "sync" + "sync/atomic" + "testing" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +func TestQueueProcessorHappyPath(t *testing.T) { + mockProc := newMockConcurrentSpanProcessor() + qp := NewQueuedSpanProcessor(mockProc) + goFn := func(b *agenttracepb.ExportTraceServiceRequest) { + qp.ProcessSpans(b, "test") + } + + spans := []*tracepb.Span{{}} + wantBatches := 10 + wantSpans := 0 + for i := 0; i < wantBatches; i++ { + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: spans, + } + wantSpans += len(spans) + spans = append(spans, &tracepb.Span{}) + fn := func() { goFn(batch) } + mockProc.runConcurrently(fn) + } + + // Wait until all batches received + mockProc.awaitAsyncProcessing() + + if wantBatches != int(mockProc.batchCount) { + t.Fatalf("Wanted %d batches, got %d", wantBatches, mockProc.batchCount) + } + if wantSpans != int(mockProc.spanCount) { + t.Fatalf("Wanted %d spans, got %d", wantSpans, mockProc.spanCount) + } +} + +type mockConcurrentSpanProcessor struct { + waitGroup *sync.WaitGroup + batchCount int32 + spanCount int32 +} + +var _ SpanProcessor = (*mockConcurrentSpanProcessor)(nil) + +func (p *mockConcurrentSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + atomic.AddInt32(&p.batchCount, 1) + atomic.AddInt32(&p.spanCount, int32(len(batch.Spans))) + p.waitGroup.Done() + return 0, nil +} + +func newMockConcurrentSpanProcessor() *mockConcurrentSpanProcessor { + return &mockConcurrentSpanProcessor{waitGroup: new(sync.WaitGroup)} +} +func (p *mockConcurrentSpanProcessor) runConcurrently(fn func()) { + p.waitGroup.Add(1) + go fn() +} + +func (p *mockConcurrentSpanProcessor) awaitAsyncProcessing() { + p.waitGroup.Wait() +} diff --git a/internal/collector/zipkin/receiver.go b/internal/collector/zipkin/receiver.go new file mode 100644 index 00000000..69a223db --- /dev/null +++ b/internal/collector/zipkin/receiver.go @@ -0,0 +1,66 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package zipkinreceiver wraps the functionality to start the end-point that +// receives Zipkin traces. +package zipkinreceiver + +import ( + "fmt" + "net" + "net/http" + "strconv" + + "github.com/spf13/viper" + "go.uber.org/zap" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + zr "github.com/census-instrumentation/opencensus-service/receiver/zipkin" +) + +// Run starts the Zipkin receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewDefaultZipkinReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + // TODO: (@pjanotti) when Zipkin implementation of StartTraceReceiver is working, change this code (this temporarily). + ss := processor.WrapWithSpanSink("zipkin", spanProc) + addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10) + zi, err := zr.New(ss) + if err != nil { + return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err) + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("Cannot bind Zipkin receiver to address %q: %v", addr, err) + } + mux := http.NewServeMux() + mux.Handle("/api/v2/spans", zi) + go func() { + if err := http.Serve(ln, mux); err != nil { + logger.Fatal("Failed to serve the Zipkin receiver: %v", zap.Error(err)) + } + }() + + logger.Info("Zipkin receiver is running.", zap.Int("port", rOpts.Port)) + + doneFn := func() { + ln.Close() + } + return doneFn, nil +}