From 482d71861d65d2238b0a023d6259be17091569c0 Mon Sep 17 00:00:00 2001 From: Bogdan Drutu Date: Thu, 8 Oct 2020 11:18:09 -0700 Subject: [PATCH] Enable queuing and retries for splunk hec exporter (#1222) * Enable queuing and retries for splunk hec exporter Signed-off-by: Bogdan Drutu * Update readme for queuing and retry Signed-off-by: Bogdan Drutu --- exporter/splunkhecexporter/README.md | 2 + exporter/splunkhecexporter/client.go | 12 ++--- exporter/splunkhecexporter/client_test.go | 46 +++++++++++++++---- exporter/splunkhecexporter/config.go | 11 ++--- exporter/splunkhecexporter/config_test.go | 16 ++++++- exporter/splunkhecexporter/exporter.go | 37 --------------- exporter/splunkhecexporter/exporter_test.go | 10 ++-- exporter/splunkhecexporter/factory.go | 43 ++++++++++++++--- .../splunkhecexporter/testdata/config.yaml | 10 ++++ 9 files changed, 116 insertions(+), 71 deletions(-) diff --git a/exporter/splunkhecexporter/README.md b/exporter/splunkhecexporter/README.md index 66753a022044..86574116a7c9 100644 --- a/exporter/splunkhecexporter/README.md +++ b/exporter/splunkhecexporter/README.md @@ -16,6 +16,8 @@ The following configuration options can also be configured: - `disable_compression` (default: false): Whether to disable gzip compression over HTTP. - `timeout` (default: 10s): HTTP timeout when sending data. - `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS. +- To configure queuing and retries see [here](https://github.com/open-telemetry/opentelemetry-collector/tree/master/exporter/exporterhelper#configuration) + Example: ```yaml diff --git a/exporter/splunkhecexporter/client.go b/exporter/splunkhecexporter/client.go index ba9ad8c0cb66..7e54a3e60297 100644 --- a/exporter/splunkhecexporter/client.go +++ b/exporter/splunkhecexporter/client.go @@ -46,7 +46,7 @@ type client struct { } func (c *client) pushMetricsData( - _ context.Context, + ctx context.Context, md pdata.Metrics, ) (droppedTimeSeries int, err error) { c.wg.Add(1) @@ -65,7 +65,7 @@ func (c *client) pushMetricsData( return numMetricPoint(md), consumererror.Permanent(err) } - req, err := http.NewRequest("POST", c.url.String(), body) + req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body) if err != nil { return numMetricPoint(md), consumererror.Permanent(err) } @@ -110,7 +110,7 @@ func (c *client) pushTraceData( return numDroppedSpans, nil } - err = c.sendSplunkEvents(splunkEvents) + err = c.sendSplunkEvents(ctx, splunkEvents) if err != nil { return td.SpanCount(), err } @@ -118,13 +118,13 @@ func (c *client) pushTraceData( return numDroppedSpans, nil } -func (c *client) sendSplunkEvents(splunkEvents []*splunkEvent) error { +func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunkEvent) error { body, compressed, err := encodeBodyEvents(&c.zippers, splunkEvents, c.config.DisableCompression) if err != nil { return consumererror.Permanent(err) } - req, err := http.NewRequest("POST", c.url.String(), body) + req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body) if err != nil { return consumererror.Permanent(err) } @@ -165,7 +165,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (numDroppedLogs return numDroppedLogs, nil } - err = c.sendSplunkEvents(splunkEvents) + err = c.sendSplunkEvents(ctx, splunkEvents) if err != nil { return ld.LogRecordCount(), err } diff --git a/exporter/splunkhecexporter/client_test.go b/exporter/splunkhecexporter/client_test.go index 93bfedd2d9d3..fa69e3648950 100644 --- a/exporter/splunkhecexporter/client_test.go +++ b/exporter/splunkhecexporter/client_test.go @@ -174,6 +174,8 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin params := component.ExporterCreateParams{Logger: zap.NewNop()} exporter, err := factory.CreateMetricsExporter(context.Background(), params, cfg) assert.NoError(t, err) + assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) + defer exporter.Shutdown(context.Background()) md := createMetricsData(numberOfDataPoints) @@ -210,6 +212,8 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) ( params := component.ExporterCreateParams{Logger: zap.NewNop()} exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg) assert.NoError(t, err) + assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) + defer exporter.Shutdown(context.Background()) td := createTraceData(numberOfTraces) @@ -246,6 +250,8 @@ func runLogExport(disableCompression bool, numberOfLogs int, t *testing.T) (stri params := component.ExporterCreateParams{Logger: zap.NewNop()} exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg) assert.NoError(t, err) + assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) + defer exporter.Shutdown(context.Background()) ld := createLogData(numberOfLogs) @@ -330,14 +336,19 @@ func TestErrorReceived(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector" + // Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces + // otherwise we will not see the error. + cfg.QueueSettings.Enabled = false + // Disable retries to not wait too much time for the return error. + cfg.RetrySettings.Enabled = false cfg.DisableCompression = true cfg.Token = "1234-1234" params := component.ExporterCreateParams{Logger: zap.NewNop()} exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg) assert.NoError(t, err) - - assert.NoError(t, err) + assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) + defer exporter.Shutdown(context.Background()) td := createTraceData(3) @@ -368,11 +379,18 @@ func TestInvalidMetrics(t *testing.T) { func TestInvalidURL(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) + // Disable queuing to ensure that we execute the request when calling ConsumeTraces + // otherwise we will not see the error. + cfg.QueueSettings.Enabled = false + // Disable retries to not wait too much time for the return error. + cfg.RetrySettings.Enabled = false cfg.Endpoint = "ftp://example.com:134" cfg.Token = "1234-1234" params := component.ExporterCreateParams{Logger: zap.NewNop()} exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg) assert.NoError(t, err) + assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost())) + defer exporter.Shutdown(context.Background()) td := createTraceData(2) @@ -417,17 +435,25 @@ func TestInvalidJsonClient(t *testing.T) { }, nil, } - c := client{url: nil, zippers: sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, config: &Config{Timeout: time.Microsecond}} - err := c.sendSplunkEvents(evs) + c := client{ + url: nil, + zippers: sync.Pool{New: func() interface{} { + return gzip.NewWriter(nil) + }}, + config: &Config{}, + } + err := c.sendSplunkEvents(context.Background(), evs) assert.EqualError(t, err, "Permanent error: json: unsupported value: +Inf") } func TestInvalidURLClient(t *testing.T) { - c := client{url: &url.URL{Host: "in va lid"}, zippers: sync.Pool{New: func() interface{} { - return gzip.NewWriter(nil) - }}, config: &Config{Timeout: time.Microsecond}} - err := c.sendSplunkEvents([]*splunkEvent{}) + c := client{ + url: &url.URL{Host: "in va lid"}, + zippers: sync.Pool{New: func() interface{} { + return gzip.NewWriter(nil) + }}, + config: &Config{}, + } + err := c.sendSplunkEvents(context.Background(), []*splunkEvent{}) assert.EqualError(t, err, "Permanent error: parse \"//in%20va%20lid\": invalid URL escape \"%20\"") } diff --git a/exporter/splunkhecexporter/config.go b/exporter/splunkhecexporter/config.go index 523644ae0cb8..384facde8188 100644 --- a/exporter/splunkhecexporter/config.go +++ b/exporter/splunkhecexporter/config.go @@ -19,9 +19,9 @@ import ( "fmt" "net/url" "path" - "time" "go.opentelemetry.io/collector/config/configmodels" + "go.opentelemetry.io/collector/exporter/exporterhelper" ) const ( @@ -31,7 +31,10 @@ const ( // Config defines configuration for Splunk exporter. type Config struct { - configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct. + exporterhelper.QueueSettings `mapstructure:"sending_queue"` + exporterhelper.RetrySettings `mapstructure:"retry_on_failure"` // HEC Token is the authentication token provided by Splunk. Token string `mapstructure:"token"` @@ -55,10 +58,6 @@ type Config struct { // Disable GZip compression. Defaults to false. DisableCompression bool `mapstructure:"disable_compression"` - // Timeout is the maximum timeout for HTTP request sending trace data. The - // default value is 10 seconds. - Timeout time.Duration `mapstructure:"timeout"` - // insecure_skip_verify skips checking the certificate of the HEC endpoint when sending data over HTTPS. Defaults to false. InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"` } diff --git a/exporter/splunkhecexporter/config_test.go b/exporter/splunkhecexporter/config_test.go index 2aa09c2557cf..85c04008b993 100644 --- a/exporter/splunkhecexporter/config_test.go +++ b/exporter/splunkhecexporter/config_test.go @@ -27,6 +27,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/config/configtest" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.uber.org/zap" ) @@ -63,7 +64,20 @@ func TestLoadConfig(t *testing.T) { SourceType: "otel", Index: "metrics", MaxConnections: 100, - Timeout: 10 * time.Second, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: 10 * time.Second, + }, + RetrySettings: exporterhelper.RetrySettings{ + Enabled: true, + InitialInterval: 10 * time.Second, + MaxInterval: 1 * time.Minute, + MaxElapsedTime: 10 * time.Minute, + }, + QueueSettings: exporterhelper.QueueSettings{ + Enabled: true, + NumConsumers: 2, + QueueSize: 10, + }, } assert.Equal(t, &expectedCfg, e1) diff --git a/exporter/splunkhecexporter/exporter.go b/exporter/splunkhecexporter/exporter.go index 246a3492fee9..cadbb69878d3 100644 --- a/exporter/splunkhecexporter/exporter.go +++ b/exporter/splunkhecexporter/exporter.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/pdata" - "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" ) @@ -111,39 +110,3 @@ func buildClient(options *exporterOptions, config *Config, logger *zap.Logger) * config: config, } } - -func (se splunkExporter) Start(ctxt context.Context, host component.Host) error { - return se.start(ctxt, host) -} - -func (se splunkExporter) Shutdown(ctxt context.Context) error { - return se.stop(ctxt) -} - -func (se splunkExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error { - ctx = obsreport.StartMetricsExportOp(ctx, typeStr) - numDroppedTimeSeries, err := se.pushMetricsData(ctx, md) - - numReceivedTimeSeries, numPoints := md.MetricAndDataPointCount() - - obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err) - return err -} - -func (se splunkExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error { - ctx = obsreport.StartTraceDataExportOp(ctx, typeStr) - - numDroppedSpans, err := se.pushTraceData(ctx, td) - - obsreport.EndTraceDataExportOp(ctx, td.SpanCount(), numDroppedSpans, err) - return err -} - -func (se splunkExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error { - ctx = obsreport.StartLogsExportOp(ctx, typeStr) - - numDroppedLogs, err := se.pushLogData(ctx, ld) - - obsreport.EndLogsExportOp(ctx, ld.LogRecordCount(), numDroppedLogs, err) - return err -} diff --git a/exporter/splunkhecexporter/exporter_test.go b/exporter/splunkhecexporter/exporter_test.go index 688f4c66cc0b..1f1bc36f6255 100644 --- a/exporter/splunkhecexporter/exporter_test.go +++ b/exporter/splunkhecexporter/exporter_test.go @@ -34,6 +34,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer/consumerdata" "go.opentelemetry.io/collector/consumer/pdata" + "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/testutil/metricstestutil" "go.opentelemetry.io/collector/translator/conventions" "go.opentelemetry.io/collector/translator/internaldata" @@ -48,9 +49,9 @@ func TestNew(t *testing.T) { assert.Nil(t, got) config := &Config{ - Token: "someToken", - Endpoint: "https://example.com:8088", - Timeout: 1 * time.Second, + Token: "someToken", + Endpoint: "https://example.com:8088", + TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second}, } got, err = createExporter(config, zap.NewNop()) assert.NoError(t, err) @@ -320,6 +321,5 @@ func TestExporterStartAlwaysReturnsNil(t *testing.T) { } e, err := createExporter(config, zap.NewNop()) assert.NoError(t, err) - err = e.Start(context.Background(), componenttest.NewNopHost()) - assert.NoError(t, err) + assert.NoError(t, e.start(context.Background(), componenttest.NewNopHost())) } diff --git a/exporter/splunkhecexporter/factory.go b/exporter/splunkhecexporter/factory.go index 67d1d36cdea2..9332f2bbda86 100644 --- a/exporter/splunkhecexporter/factory.go +++ b/exporter/splunkhecexporter/factory.go @@ -47,7 +47,11 @@ func createDefaultConfig() configmodels.Exporter { TypeVal: configmodels.Type(typeStr), NameVal: typeStr, }, - Timeout: defaultHTTPTimeout, + TimeoutSettings: exporterhelper.TimeoutSettings{ + Timeout: defaultHTTPTimeout, + }, + RetrySettings: exporterhelper.CreateDefaultRetrySettings(), + QueueSettings: exporterhelper.CreateDefaultQueueSettings(), DisableCompression: false, MaxConnections: defaultMaxIdleCons, } @@ -64,12 +68,19 @@ func createTraceExporter( expCfg := config.(*Config) exp, err := createExporter(expCfg, params.Logger) - if err != nil { return nil, err } - return exp, nil + return exporterhelper.NewTraceExporter( + expCfg, + exp.pushTraceData, + // explicitly disable since we rely on http.Client timeout logic. + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithRetry(expCfg.RetrySettings), + exporterhelper.WithQueue(expCfg.QueueSettings), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.stop)) } func createMetricsExporter( @@ -88,10 +99,22 @@ func createMetricsExporter( return nil, err } - return exp, nil + return exporterhelper.NewMetricsExporter( + expCfg, + exp.pushMetricsData, + // explicitly disable since we rely on http.Client timeout logic. + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithRetry(expCfg.RetrySettings), + exporterhelper.WithQueue(expCfg.QueueSettings), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.stop)) } -func createLogsExporter(ctx context.Context, params component.ExporterCreateParams, config configmodels.Exporter) (exporter component.LogsExporter, err error) { +func createLogsExporter( + _ context.Context, + params component.ExporterCreateParams, + config configmodels.Exporter, +) (exporter component.LogsExporter, err error) { if config == nil { return nil, errors.New("nil config") } @@ -103,5 +126,13 @@ func createLogsExporter(ctx context.Context, params component.ExporterCreatePara return nil, err } - return exp, nil + return exporterhelper.NewLogsExporter( + expCfg, + exp.pushLogData, + // explicitly disable since we rely on http.Client timeout logic. + exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}), + exporterhelper.WithRetry(expCfg.RetrySettings), + exporterhelper.WithQueue(expCfg.QueueSettings), + exporterhelper.WithStart(exp.start), + exporterhelper.WithShutdown(exp.stop)) } diff --git a/exporter/splunkhecexporter/testdata/config.yaml b/exporter/splunkhecexporter/testdata/config.yaml index 60e7b8d4afa9..4b4832459d4d 100644 --- a/exporter/splunkhecexporter/testdata/config.yaml +++ b/exporter/splunkhecexporter/testdata/config.yaml @@ -14,6 +14,16 @@ exporters: source: "otel" sourcetype: "otel" index: "metrics" + timeout: 10s + sending_queue: + enabled: true + num_consumers: 2 + queue_size: 10 + retry_on_failure: + enabled: true + initial_interval: 10s + max_interval: 60s + max_elapsed_time: 10m service: pipelines: