From 6f8e6f0917cb08098f67efee8d0c64dd33b639da Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 7 Nov 2018 21:21:46 -0800 Subject: [PATCH 1/6] Start adding pipeline to collector This is the basis to do the work for actually implementing OC collector in the short run. At first enable receivers: 1. OC Receiver 2. Jaeger 3. Zipkin The queue processor is added but not connected to exporters by default, although there are options to do so. As previously indicated before there is some duplication of code with the agent, this should be reduced as we implement the interfaces and project coalesce around certain patterns. The queue is missing its metrics and those will be added soon. --- CONTRIBUTING.md | 10 + README.md | 24 +- cmd/occollector/app/builder/builder.go | 138 ++++++++++ cmd/occollector/app/builder/builder_test.go | 80 ++++++ .../builder/testdata/receivers_disabled.yaml | 5 + .../builder/testdata/receivers_enabled.yaml | 5 + cmd/occollector/app/collector/collector.go | 259 ++++++++++++++++++ cmd/occollector/main.go | 79 +----- go.mod | 7 + go.sum | 40 +++ internal/collector/jaeger/receiver.go | 57 ++++ internal/collector/opencensus/receiver.go | 70 +++++ internal/collector/processor/doc.go | 19 ++ .../collector/processor/exporter_processor.go | 41 +++ .../collector/processor/multi_processor.go | 79 ++++++ .../processor/multi_processor_test.go | 140 ++++++++++ internal/collector/processor/options.go | 110 ++++++++ internal/collector/processor/processor.go | 47 ++++ .../collector/processor/processor_to_sink.go | 55 ++++ .../collector/processor/queued_processor.go | 146 ++++++++++ .../processor/queued_processor_test.go | 71 +++++ internal/collector/zipkin/receiver.go | 65 +++++ receiver/jaeger/trace_receiver.go | 34 ++- .../trace/jaegerthrift_to_protospan_test.go | 14 +- 24 files changed, 1495 insertions(+), 100 deletions(-) create mode 100644 cmd/occollector/app/builder/builder.go create mode 100644 cmd/occollector/app/builder/builder_test.go create mode 100644 cmd/occollector/app/builder/testdata/receivers_disabled.yaml create mode 100644 cmd/occollector/app/builder/testdata/receivers_enabled.yaml create mode 100644 cmd/occollector/app/collector/collector.go create mode 100644 internal/collector/jaeger/receiver.go create mode 100644 internal/collector/opencensus/receiver.go create mode 100644 internal/collector/processor/doc.go create mode 100644 internal/collector/processor/exporter_processor.go create mode 100644 internal/collector/processor/multi_processor.go create mode 100644 internal/collector/processor/multi_processor_test.go create mode 100644 internal/collector/processor/options.go create mode 100644 internal/collector/processor/processor.go create mode 100644 internal/collector/processor/processor_to_sink.go create mode 100644 internal/collector/processor/queued_processor.go create mode 100644 internal/collector/processor/queued_processor_test.go create mode 100644 internal/collector/zipkin/receiver.go diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 8b41cba0..703d4747 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -53,3 +53,13 @@ $ git checkout -b feature $ git commit $ git push fork feature ``` + +## General Notes + +This project uses go 1.11 and Travis for CI. + +Travis CI uses the Makefile with the default target, it is recommended to +run it before submitting your PR. It runs `gofmt -s` (simplify) and `golint`. + +The depencies are managed with `go mod` if you work with the sources under your +`$GOPATH` you need to set the environment variable `GO111MODULE` to `on`. \ No newline at end of file diff --git a/README.md b/README.md index c4a1389a..23e54ee8 100644 --- a/README.md +++ b/README.md @@ -292,20 +292,38 @@ The collector is in its initial development stages. It can be run directly from sources, binary, or a Docker image. 1. Run from sources: -``` +```shell $ go run github.com/census-instrumentation/opencensus-service/cmd/occollector ``` 2. Run from binary (from the root of your repo): -``` +```shell $ make collector $ ./bin/occollector_$($GOOS) ``` 3. Build a Docker scratch image and use the appropria Docker command for your scenario: -``` +```shell $ make docker-collector $ docker run --rm -it -p 55678:55678 occollector ``` +4. It can be configured via command-line or config file: +```shell +OpenCensus Collector + +Usage: + occollector [flags] + +Flags: + --add-queued-processor Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper) + --config string Path to the config file + -h, --help help for occollector + --log-level string Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL) (default "INFO") + --noop-processor Flag to add the no-op processor (combine with log level DEBUG to log incoming spans) + --receive-jaeger Flag to run the Jaeger receiver (i.e.: Jaeger Collector) + --receive-oc Flag to run the OpenCensus receiver + --receive-zipkin Flag to run the Zipkin receiver +``` + [travis-image]: https://travis-ci.org/census-instrumentation/opencensus-service.svg?branch=master [travis-url]: https://travis-ci.org/census-instrumentation/opencensus-service [godoc-image]: https://godoc.org/github.com/census-instrumentation/opencensus-service?status.svg diff --git a/cmd/occollector/app/builder/builder.go b/cmd/occollector/app/builder/builder.go new file mode 100644 index 00000000..9b711dce --- /dev/null +++ b/cmd/occollector/app/builder/builder.go @@ -0,0 +1,138 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package builder handles the options to build the OpenCensus collector +// pipeline. +package builder + +import ( + "fmt" + "strings" + + "github.com/spf13/viper" +) + +const ( + receiversRoot = "receivers" + jaegerEntry = "jaeger" + opencensusEntry = "opencensus" + zipkinEntry = "zipkin" +) + +// JaegerReceiverCfg holds configuration for Jaeger receivers. +type JaegerReceiverCfg struct { + // JaegerThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests + JaegerThriftTChannelPort int `mapstructure:"thrift-tchannel-port"` + // ReceiverJaegerHTTPPort is the port that the relay receives on for jaeger thrift http requests + JaegerThriftHTTPPort int `mapstructure:"thrift-http-port"` +} + +// JaegerReceiverEnabled checks if the Jaeger receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func JaegerReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, jaegerEntry) +} + +// NewJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values +func NewJaegerReceiverCfg() *JaegerReceiverCfg { + opts := &JaegerReceiverCfg{ + JaegerThriftTChannelPort: 14267, + JaegerThriftHTTPPort: 14268, + } + return opts +} + +// InitFromViper returns a JaegerReceiverCfg according to the configuration. +func (cfg *JaegerReceiverCfg) InitFromViper(v *viper.Viper) (*JaegerReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, jaegerEntry) +} + +// OpenCensusReceiverCfg holds configuration for OpenCensus receiver. +type OpenCensusReceiverCfg struct { + // Port is the port that the receiver will use + Port int `mapstructure:"port"` +} + +// OpenCensusReceiverEnabled checks if the OpenCensus receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func OpenCensusReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, opencensusEntry) +} + +// NewOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values +func NewOpenCensusReceiverCfg() *OpenCensusReceiverCfg { + opts := &OpenCensusReceiverCfg{ + Port: 55678, + } + return opts +} + +// InitFromViper returns a OpenCensusReceiverCfg according to the configuration. +func (cfg *OpenCensusReceiverCfg) InitFromViper(v *viper.Viper) (*OpenCensusReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, opencensusEntry) +} + +// ZipkinReceiverCfg holds configuration for Zipkin receiver. +type ZipkinReceiverCfg struct { + // Port is the port that the receiver will use + Port int `mapstructure:"port"` +} + +// ZipkinReceiverEnabled checks if the Zipkin receiver is enabled, via a command-line flag, environment +// variable, or configuration file. +func ZipkinReceiverEnabled(v *viper.Viper, cmdFlag string) bool { + return featureEnabled(v, cmdFlag, receiversRoot, zipkinEntry) +} + +// NewZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values +func NewZipkinReceiverCfg() *ZipkinReceiverCfg { + opts := &ZipkinReceiverCfg{ + Port: 9411, + } + return opts +} + +// InitFromViper returns a ZipkinReceiverCfg according to the configuration. +func (cfg *ZipkinReceiverCfg) InitFromViper(v *viper.Viper) (*ZipkinReceiverCfg, error) { + return cfg, initFromViper(cfg, v, receiversRoot, zipkinEntry) +} + +// Helper functions + +func initFromViper(cfg interface{}, v *viper.Viper, labels ...string) error { + v = getViperSub(v, labels...) + if v == nil { + return nil + } + if err := v.Unmarshal(cfg); err != nil { + return fmt.Errorf("Failed to read configuration for %s %v", strings.Join(labels, ": "), err) + } + + return nil +} + +func getViperSub(v *viper.Viper, labels ...string) *viper.Viper { + for _, label := range labels { + v = v.Sub(label) + if v == nil { + return nil + } + } + + return v +} + +func featureEnabled(v *viper.Viper, cmdFlag string, labels ...string) bool { + return v.GetBool(cmdFlag) || (getViperSub(v, labels...) != nil) +} diff --git a/cmd/occollector/app/builder/builder_test.go b/cmd/occollector/app/builder/builder_test.go new file mode 100644 index 00000000..df68b6fb --- /dev/null +++ b/cmd/occollector/app/builder/builder_test.go @@ -0,0 +1,80 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package builder + +import ( + "reflect" + "testing" + + "github.com/spf13/viper" +) + +func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { + v, err := loadViperFromFile("./testdata/receivers_enabled.yaml") + if err != nil { + t.Fatalf("Failed to load viper from test file: %v", err) + } + + j, oc, z := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if !j || !oc || !z { + t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v", j, oc, z) + } + + wj := NewJaegerReceiverCfg() + gj, err := wj.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err) + } else if !reflect.DeepEqual(wj, gj) { + t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj) + } + + woc := NewOpenCensusReceiverCfg() + goc, err := woc.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err) + } else if !reflect.DeepEqual(woc, goc) { + t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc) + } + + wz := NewZipkinReceiverCfg() + gz, err := wz.InitFromViper(v) + if err != nil { + t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err) + } else if !reflect.DeepEqual(wz, gz) { + t.Errorf("Incorrect config for Zipkin receiver, want %v got %v", wz, gz) + } +} + +func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) { + v, err := loadViperFromFile("./testdata/receivers_disabled.yaml") + if err != nil { + t.Fatalf("Failed to load viper from test file: %v", err) + } + + j, oc, z := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if j || oc || z { + t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v", j, oc, z) + } +} + +func loadViperFromFile(file string) (*viper.Viper, error) { + v := viper.New() + v.SetConfigFile(file) + err := v.ReadInConfig() + if err != nil { + return nil, err + } + return v, nil +} diff --git a/cmd/occollector/app/builder/testdata/receivers_disabled.yaml b/cmd/occollector/app/builder/testdata/receivers_disabled.yaml new file mode 100644 index 00000000..249d7d75 --- /dev/null +++ b/cmd/occollector/app/builder/testdata/receivers_disabled.yaml @@ -0,0 +1,5 @@ +# No receivers enabled, all commented out +receivers: + # jaeger: {} + # opencensus: {} + # zipkin: {} diff --git a/cmd/occollector/app/builder/testdata/receivers_enabled.yaml b/cmd/occollector/app/builder/testdata/receivers_enabled.yaml new file mode 100644 index 00000000..1287369a --- /dev/null +++ b/cmd/occollector/app/builder/testdata/receivers_enabled.yaml @@ -0,0 +1,5 @@ +# Enable receivers with default configuration +receivers: + jaeger: {} + opencensus: {} + zipkin: {} diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go new file mode 100644 index 00000000..85fa1d22 --- /dev/null +++ b/cmd/occollector/app/collector/collector.go @@ -0,0 +1,259 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package collector handles the command-line, configuration, and runs the OC collector. +package collector + +import ( + "io/ioutil" + "log" + "os" + "os/signal" + "strings" + "syscall" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/exporter" + "github.com/census-instrumentation/opencensus-service/exporter/exporterparser" + "github.com/census-instrumentation/opencensus-service/internal/collector/jaeger" + "github.com/census-instrumentation/opencensus-service/internal/collector/opencensus" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/internal/collector/zipkin" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + configCfg = "config" + logLevelCfg = "log-level" + jaegerReceiverFlg = "receive-jaeger" + ocReceiverFlg = "receive-oc" + zipkinReceiverFlg = "receive-zipkin" + noopProcessorFlg = "noop-processor" + queuedProcessorFlg = "add-queued-processor" // TODO: (@pjanotti) this is temporary flag until it can be read from config. +) + +var ( + config string + + v = viper.New() + + logger *zap.Logger + + rootCmd = &cobra.Command{ + Use: "occollector", + Long: "OpenCensus Collector", + Run: func(cmd *cobra.Command, args []string) { + if file := v.GetString(configCfg); file != "" { + v.SetConfigFile(file) + err := v.ReadInConfig() + if err != nil { + log.Fatalf("Error loading config file %q: %v", file, err) + return + } + } + + execute() + }, + } +) + +func init() { + rootCmd.PersistentFlags().String(logLevelCfg, "INFO", "Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL)") + v.BindPFlag(logLevelCfg, rootCmd.PersistentFlags().Lookup(logLevelCfg)) + + // local flags + rootCmd.Flags().StringVar(&config, configCfg, "", "Path to the config file") + rootCmd.Flags().Bool(jaegerReceiverFlg, false, "Flag to run the Jaeger receiver (i.e.: Jaeger Collector)") + rootCmd.Flags().Bool(ocReceiverFlg, false, "Flag to run the OpenCensus receiver") + rootCmd.Flags().Bool(zipkinReceiverFlg, false, "Flag to run the Zipkin receiver") + rootCmd.Flags().Bool(noopProcessorFlg, false, "Flag to add the no-op processor (combine with log level DEBUG to log incoming spans)") + rootCmd.Flags().Bool(queuedProcessorFlg, false, "Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper)") + + // TODO: (@pjanotti) add builder options as flags, before calls bellow. Likely it will require code re-org. + + v.AutomaticEnv() + v.SetEnvKeyReplacer(strings.NewReplacer("-", "_", ".", "_")) + v.BindPFlags(rootCmd.Flags()) +} + +func newLogger() (*zap.Logger, error) { + var level zapcore.Level + err := (&level).UnmarshalText([]byte(v.GetString(logLevelCfg))) + if err != nil { + return nil, err + } + conf := zap.NewProductionConfig() + conf.Level.SetLevel(level) + return conf.Build() +} + +func execute() { + var signalsChannel = make(chan os.Signal) + signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) + + var err error + logger, err = newLogger() + if err != nil { + log.Fatalf("Failed to get logger: %v", err) + } + + logger.Info("Starting...") + + // Build pipeline from its end: 1st exporters, the OC-proto queue processor, and + // finally the receivers. + + var closeFns []func() + var spanProcessors []processor.SpanProcessor + exportersCloseFns, exporters := createExporters() + closeFns = append(closeFns, exportersCloseFns...) + if len(exporters) > 0 { + // Exporters need an extra hop from OC-proto to span data: to workaround that for now + // we will use a special processor that transforms the data to a format that they can consume. + // TODO: (@pjanotti) we should avoid this step in the long run, its an extra hop just to re-use + // the exporters: this can lose node information and it is not ideal for performance and delegates + // the retry/buffering to the exporters (that are designed to run within the tracing process). + spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(exporters...)) + } + + if v.GetBool(noopProcessorFlg) { + spanProcessors = append(spanProcessors, processor.NewNoopSpanProcessor(logger)) + } + + if len(spanProcessors) == 0 { + logger.Warn("Nothing to do: no processor was enabled. Shutting down.") + os.Exit(1) + } + + // TODO: (@pjanotti) construct queued processor from config options, for now just to exercise it, wrap one around + // the first span processor available. + if v.GetBool(queuedProcessorFlg) { + spanProcessors[0] = processor.NewQueuedSpanProcessor(spanProcessors[0]) + } + + // Wraps processors in a single one to be connected to all enabled receivers. + spanProcessor := processor.NewMultiSpanProcessor(spanProcessors...) + + receiversCloseFns := createReceivers(spanProcessor) + closeFns = append(closeFns, receiversCloseFns...) + + logger.Info("Collector is up and running.") + + <-signalsChannel + logger.Info("Starting shutdown...") + + // TODO: orderly shutdown: first receivers, then flushing pipelines giving + // senders a chance to send all their data. This may take time, the allowed + // time should be part of configuration. + for i := len(closeFns) - 1; i > 0; i-- { + closeFns[i]() + } + + logger.Info("Shutdown complete.") +} + +func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporter) { + // TODO: (@pjanotti) this is slightly modified from agent but in the end duplication, need to consolidate style and visibility. + parseFns := []struct { + name string + fn func([]byte) ([]exporter.TraceExporter, []func() error, error) + }{ + {name: "datadog", fn: exporterparser.DatadogTraceExportersFromYAML}, + {name: "stackdriver", fn: exporterparser.StackdriverTraceExportersFromYAML}, + {name: "zipkin", fn: exporterparser.ZipkinExportersFromYAML}, + {name: "jaeger", fn: exporterparser.JaegerExportersFromYAML}, + {name: "kafka", fn: exporterparser.KafkaExportersFromYAML}, + } + + if config == "" { + logger.Info("No config file, exporters can be only configured via the file.") + return + } + + cfgBlob, err := ioutil.ReadFile(config) + if err != nil { + logger.Fatal("Cannot read config file for exporters", zap.Error(err)) + } + + for _, cfg := range parseFns { + tes, tesDoneFns, err := cfg.fn(cfgBlob) + if err != nil { + logger.Fatal("Failed to create config for exporter", zap.String("exporter", cfg.name), zap.Error(err)) + } + + wasEnabled := false + for _, te := range tes { + if te != nil { + wasEnabled = true + traceExporters = append(traceExporters, te) + } + } + + for _, tesDoneFn := range tesDoneFns { + if tesDoneFn != nil { + wrapperFn := func() { + if err := tesDoneFn(); err != nil { + logger.Warn("Error when closing exporter", zap.String("exporter", cfg.name), zap.Error(err)) + } + } + doneFns = append(doneFns, wrapperFn) + } + } + + if wasEnabled { + logger.Info("Exporter enabled", zap.String("exporter", cfg.name)) + } + } + + return doneFns, traceExporters +} + +func createReceivers(spanProcessor processor.SpanProcessor) (closeFns []func()) { + var someReceiverEnabled bool + receivers := []struct { + name string + runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (func(), error) + ok bool + }{ + {"Jaeger", jaegerreceiver.Run, builder.JaegerReceiverEnabled(v, jaegerReceiverFlg)}, + {"OpenCensus", ocreceiver.Run, builder.OpenCensusReceiverEnabled(v, ocReceiverFlg)}, + {"Zipkin", zipkinreceiver.Run, builder.ZipkinReceiverEnabled(v, zipkinReceiverFlg)}, + } + + for _, receiver := range receivers { + if receiver.ok { + closeSrv, err := receiver.runFn(logger, v, spanProcessor) + if err != nil { + logger.Fatal("Cannot run receiver for "+receiver.name, zap.Error(err)) + } + closeFns = append(closeFns, closeSrv) + someReceiverEnabled = true + } + } + + if !someReceiverEnabled { + logger.Warn("Nothing to do: no receiver was enabled. Shutting down.") + os.Exit(1) + } + + return closeFns +} + +// Execute the application according to the command and configuration given +// by the user. +func Execute() error { + return rootCmd.Execute() +} diff --git a/cmd/occollector/main.go b/cmd/occollector/main.go index 1e8a12f9..8b12f534 100644 --- a/cmd/occollector/main.go +++ b/cmd/occollector/main.go @@ -20,86 +20,13 @@ package main import ( - "context" - "fmt" "log" - "os" - "os/signal" - "syscall" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" - tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" - "github.com/census-instrumentation/opencensus-service/receiver/opencensus" - "github.com/census-instrumentation/opencensus-service/spansink" - - "go.opencensus.io/plugin/ocgrpc" - "go.opencensus.io/stats/view" - "go.uber.org/zap" -) - -const ( - defaultOCAddress = ":55678" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/collector" ) func main() { - var signalsChannel = make(chan os.Signal) - signal.Notify(signalsChannel, os.Interrupt, syscall.SIGTERM) - - // TODO: Allow configuration of logger and other items such as servers, etc. - logger, err := zap.NewProduction() - if err != nil { - log.Fatalf("Failed to get production logger: %v", err) - } - - logger.Info("Starting...") - - closeSrv, err := runOCServerWithReceiver(defaultOCAddress, logger) - if err != nil { - logger.Fatal("Cannot run opencensus server", zap.String("Address", defaultOCAddress), zap.Error(err)) - } - - logger.Info("Collector is up and running.") - - <-signalsChannel - logger.Info("Starting shutdown...") - - // TODO: orderly shutdown: first receivers, then flushing pipelines giving - // senders a chance to send all their data. This may take time, the allowed - // time should be part of configuration. - closeSrv() - - logger.Info("Shutdown complete.") -} - -func runOCServerWithReceiver(addr string, logger *zap.Logger) (func() error, error) { - if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { - return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) - } - - sr := &fakeSpanSink{ - logger: logger, + if err := collector.Execute(); err != nil { + log.Fatalf("Failed to run the collector: %v", err) } - - ocr, err := opencensus.New(addr) - if err != nil { - return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) - } - if err := ocr.StartTraceReception(context.Background(), sr); err != nil { - return nil, fmt.Errorf("Failed to start OpenCensus TraceReceiver : %v", err) - } - return ocr.Stop, nil -} - -type fakeSpanSink struct { - logger *zap.Logger -} - -func (sr *fakeSpanSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { - ack := &spansink.Acknowledgement{ - SavedSpans: uint64(len(spans)), - } - - sr.logger.Info("ReceivedSpans", zap.Uint64("Received spans", ack.SavedSpans)) - - return ack, nil } diff --git a/go.mod b/go.mod index 0ca70cfe..2146fae5 100644 --- a/go.mod +++ b/go.mod @@ -5,10 +5,12 @@ require ( contrib.go.opencensus.io/exporter/ocagent v0.3.0 contrib.go.opencensus.io/exporter/stackdriver v0.7.0 git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 // indirect + github.com/BurntSushi/toml v0.3.1 // indirect github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 // indirect github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20181026070331-e7c4bd17b329 github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 // indirect github.com/aws/aws-sdk-go v1.15.68 // indirect + github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b // indirect github.com/census-instrumentation/opencensus-proto v0.1.0 github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd // indirect github.com/gogo/googleapis v1.1.0 // indirect @@ -20,13 +22,18 @@ require ( github.com/jaegertracing/jaeger v1.7.0 github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af // indirect github.com/kr/pretty v0.1.0 // indirect + github.com/mitchellh/mapstructure v1.1.2 // indirect github.com/opentracing/opentracing-go v1.0.2 // indirect github.com/openzipkin/zipkin-go v0.1.3 github.com/philhofer/fwd v1.0.0 // indirect github.com/pkg/errors v0.8.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a // indirect github.com/spf13/cobra v0.0.3 github.com/spf13/pflag v1.0.3 // indirect + github.com/spf13/viper v1.2.1 + github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 // indirect + github.com/stretchr/objx v0.1.1 // indirect github.com/stretchr/testify v1.2.2 // indirect github.com/tinylib/msgp v1.0.2 // indirect github.com/uber-go/atomic v1.3.2 // indirect diff --git a/go.sum b/go.sum index ddd5c0eb..dde3dbdd 100644 --- a/go.sum +++ b/go.sum @@ -12,6 +12,8 @@ git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999 h1:sihTnRgTOUSCQz0i git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88 h1:km2LQYVCbsEbT7HCJ/nrMopI5CvJUkniCcSf9ZP5+iQ= git.apache.org/thrift.git v0.0.0-20181101003639-92be4f312b88/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= +github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895 h1:dmc/C8bpE5VkQn65PNbbyACDC8xw8Hpp/NEurdPmQDQ= github.com/DataDog/datadog-go v0.0.0-20180822151419-281ae9f2d895/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/opencensus-go-exporter-datadog v0.0.0-20181026070331-e7c4bd17b329 h1:WOxkY7ClXANNyQQuq4rxQrM/nQnjXCpvqY0ipYxB9cQ= @@ -21,11 +23,16 @@ github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWX github.com/Shopify/toxiproxy v2.1.3+incompatible h1:awiJqUYH4q4OmoBiRccJykjd7B+w0loJi2keSna4X/M= github.com/Shopify/toxiproxy v2.1.3+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:CZI8h5fmYwCCvd2RMSsjLqHN6OqABlWJweFKxz4vdEs= +github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7 h1:CZI8h5fmYwCCvd2RMSsjLqHN6OqABlWJweFKxz4vdEs= +github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/apache/thrift v0.0.0-20161221203622-b2a4d4ae21c7/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= github.com/aws/aws-sdk-go v1.15.31/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.15.68 h1:2+CJhKkxU/ldztVaJ7V5adR1/ZnYl+oc7ufq2Bl18BQ= github.com/aws/aws-sdk-go v1.15.68/go.mod h1:E3/ieXAlvM0XWO57iftYVDLLvQ824smPP3ATZkfNZeM= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE= +github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q= github.com/census-instrumentation/opencensus-proto v0.0.2-0.20180913191712-f303ae3f8d6a/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.0.2/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.1.0 h1:VwZ9smxzX8u14/125wHIX7ARV+YhR+L4JADswwxWK0Y= @@ -41,6 +48,8 @@ github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8 github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-ini/ini v1.25.4 h1:Mujh4R/dH6YL8bxuISne3xX2+qcQ9p0IxKAP6ExWoUo= github.com/go-ini/ini v1.25.4/go.mod h1:ByCAeIL28uOIIG0E3PJtZPDL8WnHpFKFOtgjp+3Ies8= @@ -67,6 +76,8 @@ github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/grpc-ecosystem/grpc-gateway v1.5.0 h1:WcmKMm43DR7RdtlkEXQJyo5ws8iTp98CyhCCbOHMvNI= github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw= +github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= +github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= github.com/jaegertracing/jaeger v1.7.0 h1:RVpmOTj7Zb9QRFHI5CYvkYSTR8Cdn/PhM5kBxA4pZBM= @@ -81,12 +92,21 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= +github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/mitchellh/mapstructure v1.0.0 h1:vVpGvMXJPqSDh2VYHF7gsfQj8Ncx+Xw5Y1KHeTRY+7I= +github.com/mitchellh/mapstructure v1.0.0/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= +github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= +github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= github.com/openzipkin/zipkin-go v0.1.3 h1:36hTtUTQR/vPX7YVJo2PYexSbHdAJiAkDrjuXw/YlYQ= github.com/openzipkin/zipkin-go v0.1.3/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8= +github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc= +github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v0.0.0-20181005164709-635575b42742 h1:wKfigKMTgvSzBLIVvB5QaBBQI0odU6n45/UKSphjLus= @@ -96,16 +116,35 @@ github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a h1:AA9vgIBDjMHPC2McaGPojgV2dcI78ZC0TLNhYCXEKH8= +github.com/prashantv/protectmem v0.0.0-20171002184600-e20412882b3a/go.mod h1:lzZQ3Noex5pfAy7mkAeCjcBDteYU85uWWnJ/y6gKU8k= +github.com/prometheus/client_golang v0.8.0 h1:1921Yw9Gc3iSc4VQh3PIoOqgPCZS7G/4xQNVUp8Mda8= github.com/prometheus/client_golang v0.8.0/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e h1:n/3MEhJQjQxrOUCzh1Y3Re6aJUUWRp2M9+Oc3eVn/54= github.com/prometheus/common v0.0.0-20180801064454-c7de2306084e/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273 h1:agujYaXJSxSo18YNX3jzl+4G6Bstwt+kqv47GS12uL0= github.com/prometheus/procfs v0.0.0-20180725123919-05ee40e3a273/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/spf13/afero v1.1.2 h1:m8/z1t7/fwjysjQRYbP0RD+bUIF/8tJwPdEZsI83ACI= +github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= +github.com/spf13/cast v1.2.0 h1:HHl1DSRbEQN2i8tJmtS6ViPyHx35+p51amrdsiTCrkg= +github.com/spf13/cast v1.2.0/go.mod h1:r2rcYCSwa1IExKTDiTfzaxqT2FNHs8hODu4LnUfgKEg= github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8= github.com/spf13/cobra v0.0.3/go.mod h1:1l0Ry5zgKvJasoi3XT1TypsSe7PqH0Sj9dhYf7v3XqQ= +github.com/spf13/jwalterweatherman v1.0.0 h1:XHEdyB+EcvlqZamSM4ZOMGlc93t6AcsBEu9Gc1vn7yk= +github.com/spf13/jwalterweatherman v1.0.0/go.mod h1:cQK4TGJAtQXfYWX+Ddv3mKDzgVb68N+wFjFa4jdeBTo= +github.com/spf13/pflag v1.0.2/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +github.com/spf13/viper v1.2.1 h1:bIcUwXqLseLF3BDAZduuNfekWG87ibtFxi59Bq+oI9M= +github.com/spf13/viper v1.2.1/go.mod h1:P4AexN0a+C9tGAnUFNwDMYYZv3pjFuvmeiMyKRaNVlI= +github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25 h1:7z3LSn867ex6VSaahyKadf4WtSsJIgne6A1WLOAGM8A= +github.com/streadway/quantile v0.0.0-20150917103942-b0c588724d25/go.mod h1:lbP8tGiBjZ5YWIc2fzuRpTaz0b/53vT6PEs3QuAWzuU= +github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/tinylib/msgp v1.0.2 h1:DfdQrzQa7Yh2es9SuLkixqxuXS2SxsdYn0KbdrOGWD8= @@ -146,6 +185,7 @@ golang.org/x/oauth2 v0.0.0-20181102170140-232e45548389/go.mod h1:N/0e6XlmueqKjAG golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180906133057-8cf3aee42992/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e h1:o3PsSEY8E4eXWkXrIP9YJALUkVZqzHJT5DOasTyn8Vs= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181031143558-9b800f95dbbc h1:SdCq5U4J+PpbSDIl9bM0V1e1Ug1jsnBkAFvTs1htn7U= diff --git a/internal/collector/jaeger/receiver.go b/internal/collector/jaeger/receiver.go new file mode 100644 index 00000000..004996ef --- /dev/null +++ b/internal/collector/jaeger/receiver.go @@ -0,0 +1,57 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package jaegerreceiver wraps the functionality to start the end-point that +// receives data directly in the Jaeger format as jaeger-collector (UDP as +// jaeger-agent currently is not supported). +package jaegerreceiver + +import ( + "context" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/jaeger" + "github.com/spf13/viper" + "go.uber.org/zap" +) + +// Run starts the OpenCensus receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewJaegerReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + ctx := context.Background() + jtr, err := jaeger.New(ctx, rOpts.JaegerThriftTChannelPort, rOpts.JaegerThriftHTTPPort) + if err != nil { + return nil, err + } + + ss := processor.WrapWithSpanSink("jaeger", spanProc) + if err := jtr.StartTraceReception(ctx, ss); err != nil { + return nil, err + } + + logger.Info("Jaeger receiver is running.", + zap.Int("thrift-tchannel-port", rOpts.JaegerThriftTChannelPort), + zap.Int("thrift-http-port", rOpts.JaegerThriftHTTPPort)) + + closeFn := func() { + jtr.StopTraceReception(context.Background()) + } + + return closeFn, nil +} diff --git a/internal/collector/opencensus/receiver.go b/internal/collector/opencensus/receiver.go new file mode 100644 index 00000000..4d4877c4 --- /dev/null +++ b/internal/collector/opencensus/receiver.go @@ -0,0 +1,70 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package ocreceiver wraps the functionality to start the end-point that +// receives data directly in the OpenCensus format. +package ocreceiver + +import ( + "fmt" + "net" + "strconv" + "time" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace" + "github.com/spf13/viper" + "go.opencensus.io/plugin/ocgrpc" + "go.opencensus.io/stats/view" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +// Run starts the OpenCensus receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewOpenCensusReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + grpcSrv := grpc.NewServer() + + if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { + return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) + } + + lis, err := net.Listen("tcp", ":"+strconv.FormatInt(int64(rOpts.Port), 10)) + if err != nil { + return nil, fmt.Errorf("Cannot bind tcp listener to address: %v", err) + } + + ss := processor.WrapWithSpanSink("oc", spanProc) + oci, err := octrace.New(ss, octrace.WithSpanBufferPeriod(1*time.Second)) + if err != nil { + return nil, fmt.Errorf("Failed to create the OpenCensus receiver: %v", err) + } + + agenttracepb.RegisterTraceServiceServer(grpcSrv, oci) + go func() { + if err := grpcSrv.Serve(lis); err != nil { + logger.Error("OpenCensus gRPC shutdown", zap.Error(err)) + } + }() + + logger.Info("OpenCensus receiver is running.", zap.Int("port", rOpts.Port)) + + return grpcSrv.Stop, nil +} diff --git a/internal/collector/processor/doc.go b/internal/collector/processor/doc.go new file mode 100644 index 00000000..6f24e7e0 --- /dev/null +++ b/internal/collector/processor/doc.go @@ -0,0 +1,19 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package processor is the central point on the collector processing: it +// aggregates and performs any operation that applies to all traces in the +// pipeline. Traces reach it after being converted to the OpenCensus protobuf +// format. +package processor diff --git a/internal/collector/processor/exporter_processor.go b/internal/collector/processor/exporter_processor.go new file mode 100644 index 00000000..d54b5ce8 --- /dev/null +++ b/internal/collector/processor/exporter_processor.go @@ -0,0 +1,41 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "context" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/exporter" +) + +type exporterSpanProcessor struct { + tes exporter.TraceExporterSink +} + +var _ SpanProcessor = (*exporterSpanProcessor)(nil) + +// NewTraceExporterProcessor creates processor that feeds SpanData to the given trace exporters. +func NewTraceExporterProcessor(traceExporters ...exporter.TraceExporter) SpanProcessor { + return &exporterSpanProcessor{tes: exporter.MultiTraceExporters(traceExporters...)} +} + +func (sp *exporterSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + ack, err := sp.tes.ReceiveSpans(context.Background(), batch.Node, batch.Spans...) + if err != nil { + return ack.DroppedSpans, err + } + return 0, nil +} diff --git a/internal/collector/processor/multi_processor.go b/internal/collector/processor/multi_processor.go new file mode 100644 index 00000000..2dff6eed --- /dev/null +++ b/internal/collector/processor/multi_processor.go @@ -0,0 +1,79 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "fmt" + "strings" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +var ( + nilNodeReplacement = &commonpb.Node{ + Identifier: &commonpb.ProcessIdentifier{HostName: ""}, + ServiceInfo: &commonpb.ServiceInfo{Name: ""}, + } +) + +// MultiSpanProcessor enables processing on multiple processors. +// For each incoming span batch, it calls ProcessSpans method on each span +// processor one-by-one. It aggregates success/failures/errors from all of +// them and reports the result upstream. +type MultiSpanProcessor []SpanProcessor + +var _ SpanProcessor = (*MultiSpanProcessor)(nil) + +// NewMultiSpanProcessor creates a MultiSpanProcessor from the variadic +// list of passed SpanProcessors. +func NewMultiSpanProcessor(procs ...SpanProcessor) MultiSpanProcessor { + return procs +} + +// ProcessSpans implements the SpanProcessor interface +func (msp MultiSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + // TODO: (@pjanotti) receivers should never pass batches with nil Node but that happens, for now fill with replacement. + if batch.Node == nil { + batch.Node = nilNodeReplacement + } + + var maxFailures uint64 + var errors []error + for _, sp := range msp { + failures, err := sp.ProcessSpans(batch, spanFormat) + if err != nil { + errors = append(errors, err) + } + + if failures > maxFailures { + maxFailures = failures + } + } + + var err error + numErrors := len(errors) + if numErrors == 1 { + err = errors[0] + } else if numErrors > 1 { + errMsgs := make([]string, numErrors) + for _, err := range errors { + errMsgs = append(errMsgs, err.Error()) + } + err = fmt.Errorf("[%s]", strings.Join(errMsgs, "; ")) + } + + return maxFailures, err +} diff --git a/internal/collector/processor/multi_processor_test.go b/internal/collector/processor/multi_processor_test.go new file mode 100644 index 00000000..2b9ff302 --- /dev/null +++ b/internal/collector/processor/multi_processor_test.go @@ -0,0 +1,140 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "fmt" + "testing" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +func TestMultiSpanProcessorMultiplexing(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, 7), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + wantSpansCount += len(batch.Spans) + tt.ProcessSpans(batch, "test") + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d spans for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +func TestMultiSpanProcessorSomeNotOk(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + // Make one processor return false for some spans + m := processors[1].(*mockSpanProcessor) + wantFailures := uint64(2) + m.Failures = wantFailures + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, wantFailures+3), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + failures, _ := tt.ProcessSpans(batch, "test") + batchSize := len(batch.Spans) + wantSpansCount += batchSize + if wantFailures != failures { + t.Errorf("Wanted %d failures but got %d", wantFailures, failures) + } + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +func TestMultiSpanProcessorWhenOneErrors(t *testing.T) { + processors := make([]SpanProcessor, 3) + for i := range processors { + processors[i] = &mockSpanProcessor{} + } + + // Make one processor return error + m := processors[1].(*mockSpanProcessor) + m.MustFail = true + + tt := NewMultiSpanProcessor(processors...) + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: make([]*tracepb.Span, 5), + } + + var wantSpansCount = 0 + for i := 0; i < 2; i++ { + failures, err := tt.ProcessSpans(batch, "test") + if err == nil { + t.Errorf("Wanted error got nil") + return + } + batchSize := len(batch.Spans) + wantSpansCount += batchSize + if failures != uint64(batchSize) { + t.Errorf("Wanted all spans to fail, got a different value.") + } + } + + for _, p := range processors { + m := p.(*mockSpanProcessor) + if m.TotalSpans != wantSpansCount { + t.Errorf("Wanted %d for every processor but got %d", wantSpansCount, m.TotalSpans) + return + } + } +} + +type mockSpanProcessor struct { + Failures uint64 + TotalSpans int + MustFail bool +} + +var _ SpanProcessor = &mockSpanProcessor{} + +func (p *mockSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + batchSize := len(batch.Spans) + p.TotalSpans += batchSize + if p.MustFail { + return uint64(batchSize), fmt.Errorf("this processor must fail") + } + + return p.Failures, nil +} diff --git a/internal/collector/processor/options.go b/internal/collector/processor/options.go new file mode 100644 index 00000000..5e1c39c1 --- /dev/null +++ b/internal/collector/processor/options.go @@ -0,0 +1,110 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "time" + + "go.uber.org/zap" +) + +const ( + // DefaultNumWorkers is the default number of workers consuming from the processor queue + DefaultNumWorkers = 10 + // DefaultQueueSize is the default maximum number of span batches allowed in the processor's queue + DefaultQueueSize = 1000 +) + +type options struct { + logger *zap.Logger + name string + numWorkers int + queueSize int + backoffDelay time.Duration + extraFormatTypes []string + retryOnProcessingFailure bool +} + +// Option is a function that sets some option on the component. +type Option func(c *options) + +// Options is a factory for all available Option's +var Options options + +// Logger creates a Option that initializes the logger +func (options) Logger(logger *zap.Logger) Option { + return func(b *options) { + b.logger = logger + } +} + +// Name creates an Option that initializes the name of the processor +func (options) Name(name string) Option { + return func(b *options) { + b.name = name + } +} + +// NumWorkers creates an Option that initializes the number of queue consumers AKA workers +func (options) NumWorkers(numWorkers int) Option { + return func(b *options) { + b.numWorkers = numWorkers + } +} + +// QueueSize creates an Option that initializes the queue size +func (options) QueueSize(queueSize int) Option { + return func(b *options) { + b.queueSize = queueSize + } +} + +// BackoffDelay creates an Option that initializes the backoff delay +func (options) BackoffDelay(backoffDelay time.Duration) Option { + return func(b *options) { + b.backoffDelay = backoffDelay + } +} + +// ExtraFormatTypes creates an Option that initializes the extra list of format types +func (options) ExtraFormatTypes(extraFormatTypes []string) Option { + return func(b *options) { + b.extraFormatTypes = extraFormatTypes + } +} + +// RetryOnProcessingFailures creates an Option that initializes the retryOnProcessingFailure boolean +func (options) RetryOnProcessingFailures(retryOnProcessingFailure bool) Option { + return func(b *options) { + b.retryOnProcessingFailure = retryOnProcessingFailure + } +} + +func (o options) apply(opts ...Option) options { + ret := options{} + for _, opt := range opts { + opt(&ret) + } + if ret.logger == nil { + ret.logger = zap.NewNop() + } + if ret.numWorkers == 0 { + ret.numWorkers = DefaultNumWorkers + } + if ret.queueSize == 0 { + ret.queueSize = DefaultQueueSize + } + return ret +} diff --git a/internal/collector/processor/processor.go b/internal/collector/processor/processor.go new file mode 100644 index 00000000..5e1707af --- /dev/null +++ b/internal/collector/processor/processor.go @@ -0,0 +1,47 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" +) + +// SpanProcessor handles batches of spans converted to OpenCensus proto format. +type SpanProcessor interface { + // ProcessSpans processes spans and return with either a list of true/false success or an error + ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) + // TODO: (@pjanotti) For shutdown improvement, the interface needs a method to attempt that. +} + +// An initial processor that performs no operation. +type noopSpanProcessor struct{ logger *zap.Logger } + +var _ SpanProcessor = (*noopSpanProcessor)(nil) + +func (sp *noopSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + if batch.Node == nil { + sp.logger.Warn("Received batch with nil Node", zap.String("format", spanFormat)) + } + + sp.logger.Debug("noopSpanProcessor", zap.String("originalFormat", spanFormat), zap.Int("#spans", len(batch.Spans))) + return 0, nil +} + +// NewNoopSpanProcessor creates an OC SpanProcessor that just drops the received data. +func NewNoopSpanProcessor(logger *zap.Logger) SpanProcessor { + return &noopSpanProcessor{logger: logger} +} diff --git a/internal/collector/processor/processor_to_sink.go b/internal/collector/processor/processor_to_sink.go new file mode 100644 index 00000000..e31e1128 --- /dev/null +++ b/internal/collector/processor/processor_to_sink.go @@ -0,0 +1,55 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "context" + + commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" + "github.com/census-instrumentation/opencensus-service/spansink" +) + +type protoProcessorSink struct { + sourceFormat string + protoProcessor SpanProcessor +} + +var _ (spansink.Sink) = (*protoProcessorSink)(nil) + +// WrapWithSpanSink wraps a processor to be used as a span sink by receivers. +func WrapWithSpanSink(format string, p SpanProcessor) spansink.Sink { + return &protoProcessorSink{ + sourceFormat: format, + protoProcessor: p, + } +} + +func (ps *protoProcessorSink) ReceiveSpans(ctx context.Context, node *commonpb.Node, spans ...*tracepb.Span) (*spansink.Acknowledgement, error) { + batch := &agenttracepb.ExportTraceServiceRequest{ + Node: node, + Spans: spans, + } + + failures, err := ps.protoProcessor.ProcessSpans(batch, ps.sourceFormat) + + ack := &spansink.Acknowledgement{ + SavedSpans: uint64(len(batch.Spans)) - failures, + DroppedSpans: failures, + } + + return ack, err +} diff --git a/internal/collector/processor/queued_processor.go b/internal/collector/processor/queued_processor.go new file mode 100644 index 00000000..48b71523 --- /dev/null +++ b/internal/collector/processor/queued_processor.go @@ -0,0 +1,146 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "time" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/jaegertracing/jaeger/pkg/queue" + "go.uber.org/zap" +) + +type queuedSpanProcessor struct { + queue *queue.BoundedQueue + /* TODO: (@pauloja) not doing metrics for now + metrics *cApp.SpanProcessorMetrics + batchMetrics *processorBatchMetrics + */ + logger *zap.Logger + sender SpanProcessor + numWorkers int + retryOnProcessingFailure bool + backoffDelay time.Duration + stopCh chan struct{} +} + +var _ SpanProcessor = (*queuedSpanProcessor)(nil) + +type queueItem struct { + queuedTime time.Time + batch *agenttracepb.ExportTraceServiceRequest + spanFormat string +} + +// NewQueuedSpanProcessor returns a span processor that maintains a bounded +// in-memory queue of span batches, and sends out span batches using the +// provided sender +func NewQueuedSpanProcessor(sender SpanProcessor, opts ...Option) SpanProcessor { + sp := newQueuedSpanProcessor(sender, opts...) + + sp.queue.StartConsumers(sp.numWorkers, func(item interface{}) { + value := item.(*queueItem) + sp.processItemFromQueue(value) + }) + + return sp +} + +func newQueuedSpanProcessor( + sender SpanProcessor, + opts ...Option, +) *queuedSpanProcessor { + options := Options.apply(opts...) + boundedQueue := queue.NewBoundedQueue(options.queueSize, func(item interface{}) {}) + return &queuedSpanProcessor{ + queue: boundedQueue, + logger: options.logger, + numWorkers: options.numWorkers, + sender: sender, + retryOnProcessingFailure: options.retryOnProcessingFailure, + backoffDelay: options.backoffDelay, + stopCh: make(chan struct{}), + } +} + +// Stop halts the span processor and all its go-routines. +func (sp *queuedSpanProcessor) Stop() { + close(sp.stopCh) + sp.queue.Stop() +} + +// ProcessSpans implements the SpanProcessor interface +func (sp *queuedSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + ok := sp.enqueueSpanBatch(batch, spanFormat) + if !ok { + return uint64(len(batch.Spans)), nil + } + return 0, nil +} + +func (sp *queuedSpanProcessor) enqueueSpanBatch(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) bool { + item := &queueItem{ + queuedTime: time.Now(), + batch: batch, + spanFormat: spanFormat, + } + addedToQueue := sp.queue.Produce(item) + if !addedToQueue { + sp.onItemDropped(item) + } + return addedToQueue +} + +func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { + // TODO: @(pjanotti) metrics: startTime := time.Now() + // TODO: + _, err := sp.sender.ProcessSpans(item.batch, item.spanFormat) + if err != nil { + batchSize := len(item.batch.Spans) + if !sp.retryOnProcessingFailure { + // throw away the batch + sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", batchSize)) + sp.onItemDropped(item) + } else { + // try to put it back at the end of queue for retry at a later time + addedToQueue := sp.queue.Produce(item) + if !addedToQueue { + sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", batchSize)) + sp.onItemDropped(item) + } else { + sp.logger.Warn("Failed to process batch, re-enqueued", zap.Int("batch-size", batchSize)) + } + } + // back-off for configured delay, but get interrupted when shutting down + if sp.backoffDelay > 0 { + sp.logger.Warn("Backing off before next attempt", + zap.Duration("backoff-delay", sp.backoffDelay)) + select { + case <-sp.stopCh: + sp.logger.Info("Interrupted due to shutdown") + break + case <-time.After(sp.backoffDelay): + sp.logger.Info("Resume processing") + break + } + } + } +} + +func (sp *queuedSpanProcessor) onItemDropped(item *queueItem) { + sp.logger.Warn("Span batch dropped", + zap.Int("#spans", len(item.batch.Spans)), + zap.String("spanSource", item.spanFormat)) +} diff --git a/internal/collector/processor/queued_processor_test.go b/internal/collector/processor/queued_processor_test.go new file mode 100644 index 00000000..d8e7f091 --- /dev/null +++ b/internal/collector/processor/queued_processor_test.go @@ -0,0 +1,71 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package processor + +import ( + "sync" + "sync/atomic" + "testing" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1" +) + +func TestQueueProcessorHappyPath(t *testing.T) { + wg := &sync.WaitGroup{} + mockProc := &mockConcurrentSpanProcessor{waitGroup: wg} + qp := NewQueuedSpanProcessor(mockProc) + goFn := func(b *agenttracepb.ExportTraceServiceRequest) { + qp.ProcessSpans(b, "test") + } + + spans := []*tracepb.Span{{}} + wantBatches := 10 + wg.Add(wantBatches) + wantSpans := 0 + for i := 0; i < wantBatches; i++ { + batch := &agenttracepb.ExportTraceServiceRequest{ + Spans: spans, + } + wantSpans += len(spans) + spans = append(spans, &tracepb.Span{}) + go goFn(batch) + } + + // Wait until all batches received + wg.Wait() + + if wantBatches != int(mockProc.batchCount) { + t.Fatalf("Wanted %d batches, got %d", wantBatches, mockProc.batchCount) + } + if wantSpans != int(mockProc.spanCount) { + t.Fatalf("Wanted %d spans, got %d", wantSpans, mockProc.spanCount) + } +} + +type mockConcurrentSpanProcessor struct { + waitGroup *sync.WaitGroup + batchCount int32 + spanCount int32 +} + +var _ SpanProcessor = (*mockConcurrentSpanProcessor)(nil) + +func (p *mockConcurrentSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { + atomic.AddInt32(&p.batchCount, 1) + atomic.AddInt32(&p.spanCount, int32(len(batch.Spans))) + p.waitGroup.Done() + return 0, nil +} diff --git a/internal/collector/zipkin/receiver.go b/internal/collector/zipkin/receiver.go new file mode 100644 index 00000000..19f530a5 --- /dev/null +++ b/internal/collector/zipkin/receiver.go @@ -0,0 +1,65 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package zipkinreceiver wraps the functionality to start the end-point that +// receives Zipkin traces. +package zipkinreceiver + +import ( + "fmt" + "net" + "net/http" + "strconv" + + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + zr "github.com/census-instrumentation/opencensus-service/receiver/zipkin" + "github.com/spf13/viper" + "go.uber.org/zap" +) + +// Run starts the Zipkin receiver endpoint. +func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { + rOpts, err := builder.NewZipkinReceiverCfg().InitFromViper(v) + if err != nil { + return nil, err + } + + // TODO: (@pjanotti) when Zipkin implementation of StartTraceReceiver is working, change this code (this temporarily). + ss := processor.WrapWithSpanSink("zipkin", spanProc) + addr := ":" + strconv.FormatInt(int64(rOpts.Port), 10) + zi, err := zr.New(ss) + if err != nil { + return nil, fmt.Errorf("Failed to create the Zipkin receiver: %v", err) + } + + ln, err := net.Listen("tcp", addr) + if err != nil { + return nil, fmt.Errorf("Cannot bind Zipkin receiver to address %q: %v", addr, err) + } + mux := http.NewServeMux() + mux.Handle("/api/v2/spans", zi) + go func() { + if err := http.Serve(ln, mux); err != nil { + logger.Fatal("Failed to serve the Zipkin receiver: %v", zap.Error(err)) + } + }() + + logger.Info("Zipkin receiver is running.", zap.Int("port", rOpts.Port)) + + doneFn := func() { + ln.Close() + } + return doneFn, nil +} diff --git a/receiver/jaeger/trace_receiver.go b/receiver/jaeger/trace_receiver.go index 96893afb..0f094ee2 100644 --- a/receiver/jaeger/trace_receiver.go +++ b/receiver/jaeger/trace_receiver.go @@ -48,8 +48,8 @@ type jReceiver struct { tchannelPort int collectorHTTPPort int - tchannel *tchannel.Channel - collectorServer *http.Server + tchannelLn net.Listener + collectorLn net.Listener } const ( @@ -107,14 +107,14 @@ func (jr *jReceiver) StartTraceReception(ctx context.Context, spanSink spansink. return } tch.Serve(tln) - jr.tchannel = tch + jr.tchannelLn = tln // Now the collector that runs over HTTP caddr := jr.collectorAddr() cln, cerr := net.Listen("tcp", caddr) if cerr != nil { - // Abort and close tch - tch.Close() + // Abort and close tln + _ = tln.Close() err = fmt.Errorf("Failed to bind to Collector address %q: %v", caddr, cerr) return } @@ -122,10 +122,10 @@ func (jr *jReceiver) StartTraceReception(ctx context.Context, spanSink spansink. nr := mux.NewRouter() apiHandler := app.NewAPIHandler(jr) apiHandler.RegisterRoutes(nr) - jr.collectorServer = &http.Server{Handler: nr} go func() { - _ = jr.collectorServer.Serve(cln) + _ = http.Serve(cln, nr) }() + jr.collectorLn = cln // Otherwise no error was encountered, // finally set the spanSink @@ -142,15 +142,21 @@ func (jr *jReceiver) StopTraceReception(ctx context.Context) error { var err = errAlreadyStopped jr.stopOnce.Do(func() { var errs []error - if jr.collectorServer != nil { - if cerr := jr.collectorServer.Close(); cerr != nil { + if jr.collectorLn != nil { + if cerr := jr.collectorLn.Close(); cerr != nil { errs = append(errs, cerr) } - jr.collectorServer = nil + jr.collectorLn = nil } - if jr.tchannel != nil { - jr.tchannel.Close() - jr.tchannel = nil + if jr.tchannelLn != nil && false { + // Not invoking jr.tchannelLn.Close() because + // the Jaeger listener invokes os.Exit(1) which + // cannot be caught and will shut down the entire + // program. + if terr := jr.tchannelLn.Close(); terr != nil { + errs = append(errs, terr) + } + jr.tchannelLn = nil } if len(errs) == 0 { err = nil @@ -160,7 +166,7 @@ func (jr *jReceiver) StopTraceReception(ctx context.Context) error { // Otherwise combine all these errors buf := new(bytes.Buffer) for _, err := range errs { - fmt.Fprintf(buf, "%s\n", err.Error()) + fmt.Fprint(buf, err.Error()) } err = errors.New(buf.String()) }) diff --git a/translator/trace/jaegerthrift_to_protospan_test.go b/translator/trace/jaegerthrift_to_protospan_test.go index a99c96fe..f738b036 100644 --- a/translator/trace/jaegerthrift_to_protospan_test.go +++ b/translator/trace/jaegerthrift_to_protospan_test.go @@ -36,13 +36,13 @@ func TestJaegerThriftBatchToOCProto(t *testing.T) { thriftInFile := fmt.Sprintf("./testdata/thrift_batch_%02d.json", i) jb := &jaeger.Batch{} if err := loadFromJSON(thriftInFile, jb); err != nil { - t.Errorf("Failed load Jaeger Thrift from %q. Error: %v", thriftInFile, err) + t.Errorf("Failed load Jaeger Thrift from %q: %v", thriftInFile, err) continue } octrace, err := JaegerThriftBatchToOCProto(jb) if err != nil { - t.Errorf("Failed to handled Jaeger Thrift Batch from %q. Error: %v", thriftInFile, err) + t.Errorf("Failed to handled Jaeger Thrift Batch from %q: %v", thriftInFile, err) continue } @@ -54,14 +54,14 @@ func TestJaegerThriftBatchToOCProto(t *testing.T) { gb, err := json.MarshalIndent(octrace, "", " ") if err != nil { - t.Errorf("Failed to convert received OC proto to json. Error: %v", err) + t.Errorf("Failed to convert received OC proto to json: %v", err) continue } protoFile := fmt.Sprintf("./testdata/ocproto_batch_%02d.json", i) wb, err := ioutil.ReadFile(protoFile) if err != nil { - t.Errorf("Failed to read file %q with expected OC proto in JSON format. Error: %v", protoFile, err) + t.Errorf("Failed to read file %q with expected OC proto in JSON format: %v", protoFile, err) continue } @@ -74,11 +74,11 @@ func TestJaegerThriftBatchToOCProto(t *testing.T) { func loadFromJSON(file string, obj interface{}) error { blob, err := ioutil.ReadFile(file) - if err == nil { - err = json.Unmarshal(blob, obj) + if err != nil { + return err } - return err + return json.Unmarshal(blob, obj) } // This test ensures that we conservatively allocate, only creating memory when necessary. From 60c1ca9a2ac6e031501d0faedaf72a5587597f7a Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Wed, 14 Nov 2018 18:42:56 -0800 Subject: [PATCH 2/6] Fix incorrect rebase --- receiver/jaeger/trace_receiver.go | 34 ++++++++----------- .../trace/jaegerthrift_to_protospan_test.go | 14 ++++---- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/receiver/jaeger/trace_receiver.go b/receiver/jaeger/trace_receiver.go index 0f094ee2..96893afb 100644 --- a/receiver/jaeger/trace_receiver.go +++ b/receiver/jaeger/trace_receiver.go @@ -48,8 +48,8 @@ type jReceiver struct { tchannelPort int collectorHTTPPort int - tchannelLn net.Listener - collectorLn net.Listener + tchannel *tchannel.Channel + collectorServer *http.Server } const ( @@ -107,14 +107,14 @@ func (jr *jReceiver) StartTraceReception(ctx context.Context, spanSink spansink. return } tch.Serve(tln) - jr.tchannelLn = tln + jr.tchannel = tch // Now the collector that runs over HTTP caddr := jr.collectorAddr() cln, cerr := net.Listen("tcp", caddr) if cerr != nil { - // Abort and close tln - _ = tln.Close() + // Abort and close tch + tch.Close() err = fmt.Errorf("Failed to bind to Collector address %q: %v", caddr, cerr) return } @@ -122,10 +122,10 @@ func (jr *jReceiver) StartTraceReception(ctx context.Context, spanSink spansink. nr := mux.NewRouter() apiHandler := app.NewAPIHandler(jr) apiHandler.RegisterRoutes(nr) + jr.collectorServer = &http.Server{Handler: nr} go func() { - _ = http.Serve(cln, nr) + _ = jr.collectorServer.Serve(cln) }() - jr.collectorLn = cln // Otherwise no error was encountered, // finally set the spanSink @@ -142,21 +142,15 @@ func (jr *jReceiver) StopTraceReception(ctx context.Context) error { var err = errAlreadyStopped jr.stopOnce.Do(func() { var errs []error - if jr.collectorLn != nil { - if cerr := jr.collectorLn.Close(); cerr != nil { + if jr.collectorServer != nil { + if cerr := jr.collectorServer.Close(); cerr != nil { errs = append(errs, cerr) } - jr.collectorLn = nil + jr.collectorServer = nil } - if jr.tchannelLn != nil && false { - // Not invoking jr.tchannelLn.Close() because - // the Jaeger listener invokes os.Exit(1) which - // cannot be caught and will shut down the entire - // program. - if terr := jr.tchannelLn.Close(); terr != nil { - errs = append(errs, terr) - } - jr.tchannelLn = nil + if jr.tchannel != nil { + jr.tchannel.Close() + jr.tchannel = nil } if len(errs) == 0 { err = nil @@ -166,7 +160,7 @@ func (jr *jReceiver) StopTraceReception(ctx context.Context) error { // Otherwise combine all these errors buf := new(bytes.Buffer) for _, err := range errs { - fmt.Fprint(buf, err.Error()) + fmt.Fprintf(buf, "%s\n", err.Error()) } err = errors.New(buf.String()) }) diff --git a/translator/trace/jaegerthrift_to_protospan_test.go b/translator/trace/jaegerthrift_to_protospan_test.go index f738b036..a99c96fe 100644 --- a/translator/trace/jaegerthrift_to_protospan_test.go +++ b/translator/trace/jaegerthrift_to_protospan_test.go @@ -36,13 +36,13 @@ func TestJaegerThriftBatchToOCProto(t *testing.T) { thriftInFile := fmt.Sprintf("./testdata/thrift_batch_%02d.json", i) jb := &jaeger.Batch{} if err := loadFromJSON(thriftInFile, jb); err != nil { - t.Errorf("Failed load Jaeger Thrift from %q: %v", thriftInFile, err) + t.Errorf("Failed load Jaeger Thrift from %q. Error: %v", thriftInFile, err) continue } octrace, err := JaegerThriftBatchToOCProto(jb) if err != nil { - t.Errorf("Failed to handled Jaeger Thrift Batch from %q: %v", thriftInFile, err) + t.Errorf("Failed to handled Jaeger Thrift Batch from %q. Error: %v", thriftInFile, err) continue } @@ -54,14 +54,14 @@ func TestJaegerThriftBatchToOCProto(t *testing.T) { gb, err := json.MarshalIndent(octrace, "", " ") if err != nil { - t.Errorf("Failed to convert received OC proto to json: %v", err) + t.Errorf("Failed to convert received OC proto to json. Error: %v", err) continue } protoFile := fmt.Sprintf("./testdata/ocproto_batch_%02d.json", i) wb, err := ioutil.ReadFile(protoFile) if err != nil { - t.Errorf("Failed to read file %q with expected OC proto in JSON format: %v", protoFile, err) + t.Errorf("Failed to read file %q with expected OC proto in JSON format. Error: %v", protoFile, err) continue } @@ -74,11 +74,11 @@ func TestJaegerThriftBatchToOCProto(t *testing.T) { func loadFromJSON(file string, obj interface{}) error { blob, err := ioutil.ReadFile(file) - if err != nil { - return err + if err == nil { + err = json.Unmarshal(blob, obj) } - return json.Unmarshal(blob, obj) + return err } // This test ensures that we conservatively allocate, only creating memory when necessary. From 20b95828d18f55c2895448cfea846c7ccfad0f55 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Thu, 15 Nov 2018 14:04:29 -0800 Subject: [PATCH 3/6] PR feedback --- README.md | 6 ++-- cmd/occollector/app/builder/builder.go | 24 +++++++------- cmd/occollector/app/builder/builder_test.go | 18 +++++----- cmd/occollector/app/collector/collector.go | 33 ++++++++++++------- internal/collector/jaeger/receiver.go | 24 +++++++++----- internal/collector/opencensus/receiver.go | 15 +++++---- .../processor/multi_processor_test.go | 6 +++- internal/collector/zipkin/receiver.go | 7 ++-- 8 files changed, 77 insertions(+), 56 deletions(-) diff --git a/README.md b/README.md index 23e54ee8..c1302a17 100644 --- a/README.md +++ b/README.md @@ -319,9 +319,9 @@ Flags: -h, --help help for occollector --log-level string Output level of logs (TRACE, DEBUG, INFO, WARN, ERROR, FATAL) (default "INFO") --noop-processor Flag to add the no-op processor (combine with log level DEBUG to log incoming spans) - --receive-jaeger Flag to run the Jaeger receiver (i.e.: Jaeger Collector) - --receive-oc Flag to run the OpenCensus receiver - --receive-zipkin Flag to run the Zipkin receiver + --receive-jaeger Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: {ThriftTChannelPort:14267 ThriftHTTPPort:14268} + --receive-oc-trace Flag to run the OpenCensus trace receiver, default settings: {Port:55678} + --receive-zipkin Flag to run the Zipkin receiver, default settings: {Port:9411} ``` [travis-image]: https://travis-ci.org/census-instrumentation/opencensus-service.svg?branch=master diff --git a/cmd/occollector/app/builder/builder.go b/cmd/occollector/app/builder/builder.go index 9b711dce..de8e753c 100644 --- a/cmd/occollector/app/builder/builder.go +++ b/cmd/occollector/app/builder/builder.go @@ -32,10 +32,10 @@ const ( // JaegerReceiverCfg holds configuration for Jaeger receivers. type JaegerReceiverCfg struct { - // JaegerThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests - JaegerThriftTChannelPort int `mapstructure:"thrift-tchannel-port"` - // ReceiverJaegerHTTPPort is the port that the relay receives on for jaeger thrift http requests - JaegerThriftHTTPPort int `mapstructure:"thrift-http-port"` + // ThriftTChannelPort is the port that the relay receives on for jaeger thrift tchannel requests + ThriftTChannelPort int `mapstructure:"thrift-tchannel-port"` + // ThriftHTTPPort is the port that the relay receives on for jaeger thrift http requests + ThriftHTTPPort int `mapstructure:"thrift-http-port"` } // JaegerReceiverEnabled checks if the Jaeger receiver is enabled, via a command-line flag, environment @@ -44,11 +44,11 @@ func JaegerReceiverEnabled(v *viper.Viper, cmdFlag string) bool { return featureEnabled(v, cmdFlag, receiversRoot, jaegerEntry) } -// NewJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values -func NewJaegerReceiverCfg() *JaegerReceiverCfg { +// NewDefaultJaegerReceiverCfg returns an instance of JaegerReceiverCfg with default values +func NewDefaultJaegerReceiverCfg() *JaegerReceiverCfg { opts := &JaegerReceiverCfg{ - JaegerThriftTChannelPort: 14267, - JaegerThriftHTTPPort: 14268, + ThriftTChannelPort: 14267, + ThriftHTTPPort: 14268, } return opts } @@ -70,8 +70,8 @@ func OpenCensusReceiverEnabled(v *viper.Viper, cmdFlag string) bool { return featureEnabled(v, cmdFlag, receiversRoot, opencensusEntry) } -// NewOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values -func NewOpenCensusReceiverCfg() *OpenCensusReceiverCfg { +// NewDefaultOpenCensusReceiverCfg returns an instance of OpenCensusReceiverCfg with default values +func NewDefaultOpenCensusReceiverCfg() *OpenCensusReceiverCfg { opts := &OpenCensusReceiverCfg{ Port: 55678, } @@ -95,8 +95,8 @@ func ZipkinReceiverEnabled(v *viper.Viper, cmdFlag string) bool { return featureEnabled(v, cmdFlag, receiversRoot, zipkinEntry) } -// NewZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values -func NewZipkinReceiverCfg() *ZipkinReceiverCfg { +// NewDefaultZipkinReceiverCfg returns an instance of ZipkinReceiverCfg with default values +func NewDefaultZipkinReceiverCfg() *ZipkinReceiverCfg { opts := &ZipkinReceiverCfg{ Port: 9411, } diff --git a/cmd/occollector/app/builder/builder_test.go b/cmd/occollector/app/builder/builder_test.go index df68b6fb..4f19c1eb 100644 --- a/cmd/occollector/app/builder/builder_test.go +++ b/cmd/occollector/app/builder/builder_test.go @@ -27,12 +27,12 @@ func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { t.Fatalf("Failed to load viper from test file: %v", err) } - j, oc, z := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") - if !j || !oc || !z { - t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v", j, oc, z) + jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if !jaegerEnabled || !opencensusEnabled || !zipkinEnabled { + t.Fatalf("Some of the expected receivers were not enabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled) } - wj := NewJaegerReceiverCfg() + wj := NewDefaultJaegerReceiverCfg() gj, err := wj.InitFromViper(v) if err != nil { t.Errorf("Failed to InitFromViper for Jaeger receiver: %v", err) @@ -40,7 +40,7 @@ func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { t.Errorf("Incorrect config for Jaeger receiver, want %v got %v", wj, gj) } - woc := NewOpenCensusReceiverCfg() + woc := NewDefaultOpenCensusReceiverCfg() goc, err := woc.InitFromViper(v) if err != nil { t.Errorf("Failed to InitFromViper for OpenCensus receiver: %v", err) @@ -48,7 +48,7 @@ func TestReceiversEnabledByPresenceWithDefaultSettings(t *testing.T) { t.Errorf("Incorrect config for OpenCensus receiver, want %v got %v", woc, goc) } - wz := NewZipkinReceiverCfg() + wz := NewDefaultZipkinReceiverCfg() gz, err := wz.InitFromViper(v) if err != nil { t.Errorf("Failed to InitFromViper for Zipkin receiver: %v", err) @@ -63,9 +63,9 @@ func TestReceiversDisabledByPresenceWithDefaultSettings(t *testing.T) { t.Fatalf("Failed to load viper from test file: %v", err) } - j, oc, z := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") - if j || oc || z { - t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v", j, oc, z) + jaegerEnabled, opencensusEnabled, zipkinEnabled := JaegerReceiverEnabled(v, "j"), OpenCensusReceiverEnabled(v, "oc"), ZipkinReceiverEnabled(v, "z") + if jaegerEnabled || opencensusEnabled || zipkinEnabled { + t.Fatalf("Not all receivers were disabled j:%v oc:%v z:%v", jaegerEnabled, opencensusEnabled, zipkinEnabled) } } diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go index 85fa1d22..a30c886e 100644 --- a/cmd/occollector/app/collector/collector.go +++ b/cmd/occollector/app/collector/collector.go @@ -16,6 +16,7 @@ package collector import ( + "fmt" "io/ioutil" "log" "os" @@ -23,6 +24,11 @@ import ( "strings" "syscall" + "github.com/spf13/cobra" + "github.com/spf13/viper" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" "github.com/census-instrumentation/opencensus-service/exporter" "github.com/census-instrumentation/opencensus-service/exporter/exporterparser" @@ -30,17 +36,13 @@ import ( "github.com/census-instrumentation/opencensus-service/internal/collector/opencensus" "github.com/census-instrumentation/opencensus-service/internal/collector/processor" "github.com/census-instrumentation/opencensus-service/internal/collector/zipkin" - "github.com/spf13/cobra" - "github.com/spf13/viper" - "go.uber.org/zap" - "go.uber.org/zap/zapcore" ) const ( configCfg = "config" logLevelCfg = "log-level" jaegerReceiverFlg = "receive-jaeger" - ocReceiverFlg = "receive-oc" + ocReceiverFlg = "receive-oc-trace" zipkinReceiverFlg = "receive-zipkin" noopProcessorFlg = "noop-processor" queuedProcessorFlg = "add-queued-processor" // TODO: (@pjanotti) this is temporary flag until it can be read from config. @@ -77,9 +79,12 @@ func init() { // local flags rootCmd.Flags().StringVar(&config, configCfg, "", "Path to the config file") - rootCmd.Flags().Bool(jaegerReceiverFlg, false, "Flag to run the Jaeger receiver (i.e.: Jaeger Collector)") - rootCmd.Flags().Bool(ocReceiverFlg, false, "Flag to run the OpenCensus receiver") - rootCmd.Flags().Bool(zipkinReceiverFlg, false, "Flag to run the Zipkin receiver") + rootCmd.Flags().Bool(jaegerReceiverFlg, false, + fmt.Sprintf("Flag to run the Jaeger receiver (i.e.: Jaeger Collector), default settings: %+v", *builder.NewDefaultJaegerReceiverCfg())) + rootCmd.Flags().Bool(ocReceiverFlg, false, + fmt.Sprintf("Flag to run the OpenCensus trace receiver, default settings: %+v", *builder.NewDefaultOpenCensusReceiverCfg())) + rootCmd.Flags().Bool(zipkinReceiverFlg, false, + fmt.Sprintf("Flag to run the Zipkin receiver, default settings: %+v", *builder.NewDefaultZipkinReceiverCfg())) rootCmd.Flags().Bool(noopProcessorFlg, false, "Flag to add the no-op processor (combine with log level DEBUG to log incoming spans)") rootCmd.Flags().Bool(queuedProcessorFlg, false, "Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper)") @@ -224,9 +229,9 @@ func createExporters() (doneFns []func(), traceExporters []exporter.TraceExporte func createReceivers(spanProcessor processor.SpanProcessor) (closeFns []func()) { var someReceiverEnabled bool receivers := []struct { - name string - runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (func(), error) - ok bool + name string + runFn func(*zap.Logger, *viper.Viper, processor.SpanProcessor) (func(), error) + enabled bool }{ {"Jaeger", jaegerreceiver.Run, builder.JaegerReceiverEnabled(v, jaegerReceiverFlg)}, {"OpenCensus", ocreceiver.Run, builder.OpenCensusReceiverEnabled(v, ocReceiverFlg)}, @@ -234,9 +239,13 @@ func createReceivers(spanProcessor processor.SpanProcessor) (closeFns []func()) } for _, receiver := range receivers { - if receiver.ok { + if receiver.enabled { closeSrv, err := receiver.runFn(logger, v, spanProcessor) if err != nil { + // TODO: (@pjanotti) better shutdown, for now just try to stop any started receiver before terminating. + for _, closeFn := range closeFns { + closeFn() + } logger.Fatal("Cannot run receiver for "+receiver.name, zap.Error(err)) } closeFns = append(closeFns, closeSrv) diff --git a/internal/collector/jaeger/receiver.go b/internal/collector/jaeger/receiver.go index 004996ef..f91bb30d 100644 --- a/internal/collector/jaeger/receiver.go +++ b/internal/collector/jaeger/receiver.go @@ -13,29 +13,35 @@ // limitations under the License. // Package jaegerreceiver wraps the functionality to start the end-point that -// receives data directly in the Jaeger format as jaeger-collector (UDP as -// jaeger-agent currently is not supported). +// receives Jaeger data sent by the jaeger-agent in jaeger.thrift format over +// TChannel and directly from clients in jaeger.thrift format over binary thrift +// protocol (HTTP transport). +// Note that the UDP transport is not supported since these protocol/transport +// are for task->jaeger-agent communication only and the receiver does not try to +// support jaeger-agent endpoints. +// TODO: add support for the jaeger proto endpoint released in jaeger 1.8package jaegerreceiver package jaegerreceiver import ( "context" + "github.com/spf13/viper" + "go.uber.org/zap" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" "github.com/census-instrumentation/opencensus-service/internal/collector/processor" "github.com/census-instrumentation/opencensus-service/receiver/jaeger" - "github.com/spf13/viper" - "go.uber.org/zap" ) -// Run starts the OpenCensus receiver endpoint. +// Run starts the Jaeger receiver endpoint. func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { - rOpts, err := builder.NewJaegerReceiverCfg().InitFromViper(v) + rOpts, err := builder.NewDefaultJaegerReceiverCfg().InitFromViper(v) if err != nil { return nil, err } ctx := context.Background() - jtr, err := jaeger.New(ctx, rOpts.JaegerThriftTChannelPort, rOpts.JaegerThriftHTTPPort) + jtr, err := jaeger.New(ctx, rOpts.ThriftTChannelPort, rOpts.ThriftHTTPPort) if err != nil { return nil, err } @@ -46,8 +52,8 @@ func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) ( } logger.Info("Jaeger receiver is running.", - zap.Int("thrift-tchannel-port", rOpts.JaegerThriftTChannelPort), - zap.Int("thrift-http-port", rOpts.JaegerThriftHTTPPort)) + zap.Int("thrift-tchannel-port", rOpts.ThriftTChannelPort), + zap.Int("thrift-http-port", rOpts.ThriftHTTPPort)) closeFn := func() { jtr.StopTraceReception(context.Background()) diff --git a/internal/collector/opencensus/receiver.go b/internal/collector/opencensus/receiver.go index 4d4877c4..5571b9da 100644 --- a/internal/collector/opencensus/receiver.go +++ b/internal/collector/opencensus/receiver.go @@ -22,25 +22,26 @@ import ( "strconv" "time" - agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" - "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" - "github.com/census-instrumentation/opencensus-service/internal/collector/processor" - "github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace" "github.com/spf13/viper" "go.opencensus.io/plugin/ocgrpc" "go.opencensus.io/stats/view" "go.uber.org/zap" - "google.golang.org/grpc" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" + "github.com/census-instrumentation/opencensus-service/internal" + "github.com/census-instrumentation/opencensus-service/internal/collector/processor" + "github.com/census-instrumentation/opencensus-service/receiver/opencensus/octrace" ) // Run starts the OpenCensus receiver endpoint. func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { - rOpts, err := builder.NewOpenCensusReceiverCfg().InitFromViper(v) + rOpts, err := builder.NewDefaultOpenCensusReceiverCfg().InitFromViper(v) if err != nil { return nil, err } - grpcSrv := grpc.NewServer() + grpcSrv := internal.GRPCServerWithObservabilityEnabled() if err := view.Register(ocgrpc.DefaultServerViews...); err != nil { return nil, fmt.Errorf("Failed to register ocgrpc.DefaultServerViews: %v", err) diff --git a/internal/collector/processor/multi_processor_test.go b/internal/collector/processor/multi_processor_test.go index 2b9ff302..7330fed8 100644 --- a/internal/collector/processor/multi_processor_test.go +++ b/internal/collector/processor/multi_processor_test.go @@ -60,8 +60,12 @@ func TestMultiSpanProcessorSomeNotOk(t *testing.T) { m.Failures = wantFailures tt := NewMultiSpanProcessor(processors...) + spans := make([]*tracepb.Span, wantFailures+3) + for i := range spans { + spans[i] = &tracepb.Span{} + } batch := &agenttracepb.ExportTraceServiceRequest{ - Spans: make([]*tracepb.Span, wantFailures+3), + Spans: spans, } var wantSpansCount = 0 diff --git a/internal/collector/zipkin/receiver.go b/internal/collector/zipkin/receiver.go index 19f530a5..69a223db 100644 --- a/internal/collector/zipkin/receiver.go +++ b/internal/collector/zipkin/receiver.go @@ -22,16 +22,17 @@ import ( "net/http" "strconv" + "github.com/spf13/viper" + "go.uber.org/zap" + "github.com/census-instrumentation/opencensus-service/cmd/occollector/app/builder" "github.com/census-instrumentation/opencensus-service/internal/collector/processor" zr "github.com/census-instrumentation/opencensus-service/receiver/zipkin" - "github.com/spf13/viper" - "go.uber.org/zap" ) // Run starts the Zipkin receiver endpoint. func Run(logger *zap.Logger, v *viper.Viper, spanProc processor.SpanProcessor) (func(), error) { - rOpts, err := builder.NewZipkinReceiverCfg().InitFromViper(v) + rOpts, err := builder.NewDefaultZipkinReceiverCfg().InitFromViper(v) if err != nil { return nil, err } From a397f06e86dd610e717eaeef7db8fb053b1d62c1 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Thu, 15 Nov 2018 17:21:18 -0800 Subject: [PATCH 4/6] PR feedback 2 --- internal/collector/processor/queued_processor.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/internal/collector/processor/queued_processor.go b/internal/collector/processor/queued_processor.go index 48b71523..7fd85161 100644 --- a/internal/collector/processor/queued_processor.go +++ b/internal/collector/processor/queued_processor.go @@ -17,17 +17,14 @@ package processor import ( "time" - agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" "github.com/jaegertracing/jaeger/pkg/queue" "go.uber.org/zap" + + agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" ) type queuedSpanProcessor struct { - queue *queue.BoundedQueue - /* TODO: (@pauloja) not doing metrics for now - metrics *cApp.SpanProcessorMetrics - batchMetrics *processorBatchMetrics - */ + queue *queue.BoundedQueue logger *zap.Logger sender SpanProcessor numWorkers int @@ -114,9 +111,9 @@ func (sp *queuedSpanProcessor) processItemFromQueue(item *queueItem) { sp.logger.Error("Failed to process batch, discarding", zap.Int("batch-size", batchSize)) sp.onItemDropped(item) } else { - // try to put it back at the end of queue for retry at a later time - addedToQueue := sp.queue.Produce(item) - if !addedToQueue { + // TODO: (@pjanotti) do not put it back on the end of the queue, retry with it directly. + // This will have the benefit of keeping the batch closer to related ones in time. + if !sp.queue.Produce(item) { sp.logger.Error("Failed to process batch and failed to re-enqueue", zap.Int("batch-size", batchSize)) sp.onItemDropped(item) } else { From 763bf6df945eac91bf9de02fd4b31b1cb51d147b Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 16 Nov 2018 15:03:51 -0800 Subject: [PATCH 5/6] PR Feedback --- CONTRIBUTING.md | 4 +-- cmd/occollector/app/collector/collector.go | 6 ++-- .../collector/processor/multi_processor.go | 13 --------- internal/collector/processor/options.go | 28 +++++++++---------- internal/collector/processor/processor.go | 14 +++++----- .../collector/processor/queued_processor.go | 25 +++++++++-------- 6 files changed, 39 insertions(+), 51 deletions(-) diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 703d4747..0bf13ced 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -61,5 +61,5 @@ This project uses go 1.11 and Travis for CI. Travis CI uses the Makefile with the default target, it is recommended to run it before submitting your PR. It runs `gofmt -s` (simplify) and `golint`. -The depencies are managed with `go mod` if you work with the sources under your -`$GOPATH` you need to set the environment variable `GO111MODULE` to `on`. \ No newline at end of file +The dependencies are managed with `go mod` if you work with the sources under your +`$GOPATH` you need to set the environment variable `GO111MODULE=on`. \ No newline at end of file diff --git a/cmd/occollector/app/collector/collector.go b/cmd/occollector/app/collector/collector.go index a30c886e..dc3cae6b 100644 --- a/cmd/occollector/app/collector/collector.go +++ b/cmd/occollector/app/collector/collector.go @@ -44,7 +44,7 @@ const ( jaegerReceiverFlg = "receive-jaeger" ocReceiverFlg = "receive-oc-trace" zipkinReceiverFlg = "receive-zipkin" - noopProcessorFlg = "noop-processor" + debugProcessorFlg = "debug-processor" queuedProcessorFlg = "add-queued-processor" // TODO: (@pjanotti) this is temporary flag until it can be read from config. ) @@ -85,7 +85,7 @@ func init() { fmt.Sprintf("Flag to run the OpenCensus trace receiver, default settings: %+v", *builder.NewDefaultOpenCensusReceiverCfg())) rootCmd.Flags().Bool(zipkinReceiverFlg, false, fmt.Sprintf("Flag to run the Zipkin receiver, default settings: %+v", *builder.NewDefaultZipkinReceiverCfg())) - rootCmd.Flags().Bool(noopProcessorFlg, false, "Flag to add the no-op processor (combine with log level DEBUG to log incoming spans)") + rootCmd.Flags().Bool(debugProcessorFlg, false, "Flag to add a debug processor (combine with log level DEBUG to log incoming spans)") rootCmd.Flags().Bool(queuedProcessorFlg, false, "Flag to wrap one processor with the queued processor (flag will be remove soon, dev helper)") // TODO: (@pjanotti) add builder options as flags, before calls bellow. Likely it will require code re-org. @@ -134,7 +134,7 @@ func execute() { spanProcessors = append(spanProcessors, processor.NewTraceExporterProcessor(exporters...)) } - if v.GetBool(noopProcessorFlg) { + if v.GetBool(debugProcessorFlg) { spanProcessors = append(spanProcessors, processor.NewNoopSpanProcessor(logger)) } diff --git a/internal/collector/processor/multi_processor.go b/internal/collector/processor/multi_processor.go index 2dff6eed..4f81effe 100644 --- a/internal/collector/processor/multi_processor.go +++ b/internal/collector/processor/multi_processor.go @@ -18,17 +18,9 @@ import ( "fmt" "strings" - commonpb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/common/v1" agenttracepb "github.com/census-instrumentation/opencensus-proto/gen-go/agent/trace/v1" ) -var ( - nilNodeReplacement = &commonpb.Node{ - Identifier: &commonpb.ProcessIdentifier{HostName: ""}, - ServiceInfo: &commonpb.ServiceInfo{Name: ""}, - } -) - // MultiSpanProcessor enables processing on multiple processors. // For each incoming span batch, it calls ProcessSpans method on each span // processor one-by-one. It aggregates success/failures/errors from all of @@ -45,11 +37,6 @@ func NewMultiSpanProcessor(procs ...SpanProcessor) MultiSpanProcessor { // ProcessSpans implements the SpanProcessor interface func (msp MultiSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { - // TODO: (@pjanotti) receivers should never pass batches with nil Node but that happens, for now fill with replacement. - if batch.Node == nil { - batch.Node = nilNodeReplacement - } - var maxFailures uint64 var errors []error for _, sp := range msp { diff --git a/internal/collector/processor/options.go b/internal/collector/processor/options.go index 5e1c39c1..37de9129 100644 --- a/internal/collector/processor/options.go +++ b/internal/collector/processor/options.go @@ -43,50 +43,50 @@ type Option func(c *options) // Options is a factory for all available Option's var Options options -// Logger creates a Option that initializes the logger -func (options) Logger(logger *zap.Logger) Option { +// WithLogger creates a Option that initializes the logger +func (options) WithLogger(logger *zap.Logger) Option { return func(b *options) { b.logger = logger } } -// Name creates an Option that initializes the name of the processor -func (options) Name(name string) Option { +// WithName creates an Option that initializes the name of the processor +func (options) WithName(name string) Option { return func(b *options) { b.name = name } } -// NumWorkers creates an Option that initializes the number of queue consumers AKA workers -func (options) NumWorkers(numWorkers int) Option { +// WithNumWorkers creates an Option that initializes the number of queue consumers AKA workers +func (options) WithNumWorkers(numWorkers int) Option { return func(b *options) { b.numWorkers = numWorkers } } -// QueueSize creates an Option that initializes the queue size -func (options) QueueSize(queueSize int) Option { +// WithQueueSize creates an Option that initializes the queue size +func (options) WithQueueSize(queueSize int) Option { return func(b *options) { b.queueSize = queueSize } } -// BackoffDelay creates an Option that initializes the backoff delay -func (options) BackoffDelay(backoffDelay time.Duration) Option { +// WithBackoffDelay creates an Option that initializes the backoff delay +func (options) WithBackoffDelay(backoffDelay time.Duration) Option { return func(b *options) { b.backoffDelay = backoffDelay } } -// ExtraFormatTypes creates an Option that initializes the extra list of format types -func (options) ExtraFormatTypes(extraFormatTypes []string) Option { +// WithExtraFormatTypes creates an Option that initializes the extra list of format types +func (options) WithExtraFormatTypes(extraFormatTypes []string) Option { return func(b *options) { b.extraFormatTypes = extraFormatTypes } } -// RetryOnProcessingFailures creates an Option that initializes the retryOnProcessingFailure boolean -func (options) RetryOnProcessingFailures(retryOnProcessingFailure bool) Option { +// WithRetryOnProcessingFailures creates an Option that initializes the retryOnProcessingFailure boolean +func (options) WithRetryOnProcessingFailures(retryOnProcessingFailure bool) Option { return func(b *options) { b.retryOnProcessingFailure = retryOnProcessingFailure } diff --git a/internal/collector/processor/processor.go b/internal/collector/processor/processor.go index 5e1707af..e91f1ed3 100644 --- a/internal/collector/processor/processor.go +++ b/internal/collector/processor/processor.go @@ -22,26 +22,26 @@ import ( // SpanProcessor handles batches of spans converted to OpenCensus proto format. type SpanProcessor interface { - // ProcessSpans processes spans and return with either a list of true/false success or an error + // ProcessSpans processes spans and return with the number of spans that failed and an error. ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) // TODO: (@pjanotti) For shutdown improvement, the interface needs a method to attempt that. } -// An initial processor that performs no operation. -type noopSpanProcessor struct{ logger *zap.Logger } +// An initial processor that does not sends the data to any destination but helps debugging. +type debugSpanProcessor struct{ logger *zap.Logger } -var _ SpanProcessor = (*noopSpanProcessor)(nil) +var _ SpanProcessor = (*debugSpanProcessor)(nil) -func (sp *noopSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { +func (sp *debugSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { if batch.Node == nil { sp.logger.Warn("Received batch with nil Node", zap.String("format", spanFormat)) } - sp.logger.Debug("noopSpanProcessor", zap.String("originalFormat", spanFormat), zap.Int("#spans", len(batch.Spans))) + sp.logger.Debug("debugSpanProcessor", zap.String("originalFormat", spanFormat), zap.Int("#spans", len(batch.Spans))) return 0, nil } // NewNoopSpanProcessor creates an OC SpanProcessor that just drops the received data. func NewNoopSpanProcessor(logger *zap.Logger) SpanProcessor { - return &noopSpanProcessor{logger: logger} + return &debugSpanProcessor{logger: logger} } diff --git a/internal/collector/processor/queued_processor.go b/internal/collector/processor/queued_processor.go index 7fd85161..98563e5b 100644 --- a/internal/collector/processor/queued_processor.go +++ b/internal/collector/processor/queued_processor.go @@ -15,6 +15,7 @@ package processor import ( + "sync" "time" "github.com/jaegertracing/jaeger/pkg/queue" @@ -31,6 +32,7 @@ type queuedSpanProcessor struct { retryOnProcessingFailure bool backoffDelay time.Duration stopCh chan struct{} + stopOnce sync.Once } var _ SpanProcessor = (*queuedSpanProcessor)(nil) @@ -55,10 +57,7 @@ func NewQueuedSpanProcessor(sender SpanProcessor, opts ...Option) SpanProcessor return sp } -func newQueuedSpanProcessor( - sender SpanProcessor, - opts ...Option, -) *queuedSpanProcessor { +func newQueuedSpanProcessor(sender SpanProcessor, opts ...Option) *queuedSpanProcessor { options := Options.apply(opts...) boundedQueue := queue.NewBoundedQueue(options.queueSize, func(item interface{}) {}) return &queuedSpanProcessor{ @@ -72,19 +71,21 @@ func newQueuedSpanProcessor( } } -// Stop halts the span processor and all its go-routines. +// Stop halts the span processor and all its goroutines. func (sp *queuedSpanProcessor) Stop() { - close(sp.stopCh) - sp.queue.Stop() + sp.stopOnce.Do(func() { + close(sp.stopCh) + sp.queue.Stop() + }) } // ProcessSpans implements the SpanProcessor interface -func (sp *queuedSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (uint64, error) { - ok := sp.enqueueSpanBatch(batch, spanFormat) - if !ok { - return uint64(len(batch.Spans)), nil +func (sp *queuedSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) (failures uint64, err error) { + allAdded := sp.enqueueSpanBatch(batch, spanFormat) + if !allAdded { + failures = uint64(len(batch.Spans)) } - return 0, nil + return } func (sp *queuedSpanProcessor) enqueueSpanBatch(batch *agenttracepb.ExportTraceServiceRequest, spanFormat string) bool { From fdca5ae35077c0c7b8f37d4e7f95c32bd36ac009 Mon Sep 17 00:00:00 2001 From: Paulo Janotti Date: Fri, 16 Nov 2018 16:07:47 -0800 Subject: [PATCH 6/6] Test improvement --- .../processor/queued_processor_test.go | 21 ++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/internal/collector/processor/queued_processor_test.go b/internal/collector/processor/queued_processor_test.go index d8e7f091..1443a8e4 100644 --- a/internal/collector/processor/queued_processor_test.go +++ b/internal/collector/processor/queued_processor_test.go @@ -24,8 +24,7 @@ import ( ) func TestQueueProcessorHappyPath(t *testing.T) { - wg := &sync.WaitGroup{} - mockProc := &mockConcurrentSpanProcessor{waitGroup: wg} + mockProc := newMockConcurrentSpanProcessor() qp := NewQueuedSpanProcessor(mockProc) goFn := func(b *agenttracepb.ExportTraceServiceRequest) { qp.ProcessSpans(b, "test") @@ -33,7 +32,6 @@ func TestQueueProcessorHappyPath(t *testing.T) { spans := []*tracepb.Span{{}} wantBatches := 10 - wg.Add(wantBatches) wantSpans := 0 for i := 0; i < wantBatches; i++ { batch := &agenttracepb.ExportTraceServiceRequest{ @@ -41,11 +39,12 @@ func TestQueueProcessorHappyPath(t *testing.T) { } wantSpans += len(spans) spans = append(spans, &tracepb.Span{}) - go goFn(batch) + fn := func() { goFn(batch) } + mockProc.runConcurrently(fn) } // Wait until all batches received - wg.Wait() + mockProc.awaitAsyncProcessing() if wantBatches != int(mockProc.batchCount) { t.Fatalf("Wanted %d batches, got %d", wantBatches, mockProc.batchCount) @@ -69,3 +68,15 @@ func (p *mockConcurrentSpanProcessor) ProcessSpans(batch *agenttracepb.ExportTra p.waitGroup.Done() return 0, nil } + +func newMockConcurrentSpanProcessor() *mockConcurrentSpanProcessor { + return &mockConcurrentSpanProcessor{waitGroup: new(sync.WaitGroup)} +} +func (p *mockConcurrentSpanProcessor) runConcurrently(fn func()) { + p.waitGroup.Add(1) + go fn() +} + +func (p *mockConcurrentSpanProcessor) awaitAsyncProcessing() { + p.waitGroup.Wait() +}