Skip to content

Commit

Permalink
Enable queuing and retries for splunk hec exporter (#1222)
Browse files Browse the repository at this point in the history
* Enable queuing and retries for splunk hec exporter

Signed-off-by: Bogdan Drutu <[email protected]>

* Update readme for queuing and retry

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Oct 8, 2020
1 parent d51340b commit 482d718
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 71 deletions.
2 changes: 2 additions & 0 deletions exporter/splunkhecexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ The following configuration options can also be configured:
- `disable_compression` (default: false): Whether to disable gzip compression over HTTP.
- `timeout` (default: 10s): HTTP timeout when sending data.
- `insecure_skip_verify` (default: false): Whether to skip checking the certificate of the HEC endpoint when sending data over HTTPS.
- To configure queuing and retries see [here](https://github.com/open-telemetry/opentelemetry-collector/tree/master/exporter/exporterhelper#configuration)

Example:

```yaml
Expand Down
12 changes: 6 additions & 6 deletions exporter/splunkhecexporter/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type client struct {
}

func (c *client) pushMetricsData(
_ context.Context,
ctx context.Context,
md pdata.Metrics,
) (droppedTimeSeries int, err error) {
c.wg.Add(1)
Expand All @@ -65,7 +65,7 @@ func (c *client) pushMetricsData(
return numMetricPoint(md), consumererror.Permanent(err)
}

req, err := http.NewRequest("POST", c.url.String(), body)
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body)
if err != nil {
return numMetricPoint(md), consumererror.Permanent(err)
}
Expand Down Expand Up @@ -110,21 +110,21 @@ func (c *client) pushTraceData(
return numDroppedSpans, nil
}

err = c.sendSplunkEvents(splunkEvents)
err = c.sendSplunkEvents(ctx, splunkEvents)
if err != nil {
return td.SpanCount(), err
}

return numDroppedSpans, nil
}

func (c *client) sendSplunkEvents(splunkEvents []*splunkEvent) error {
func (c *client) sendSplunkEvents(ctx context.Context, splunkEvents []*splunkEvent) error {
body, compressed, err := encodeBodyEvents(&c.zippers, splunkEvents, c.config.DisableCompression)
if err != nil {
return consumererror.Permanent(err)
}

req, err := http.NewRequest("POST", c.url.String(), body)
req, err := http.NewRequestWithContext(ctx, "POST", c.url.String(), body)
if err != nil {
return consumererror.Permanent(err)
}
Expand Down Expand Up @@ -165,7 +165,7 @@ func (c *client) pushLogData(ctx context.Context, ld pdata.Logs) (numDroppedLogs
return numDroppedLogs, nil
}

err = c.sendSplunkEvents(splunkEvents)
err = c.sendSplunkEvents(ctx, splunkEvents)
if err != nil {
return ld.LogRecordCount(), err
}
Expand Down
46 changes: 36 additions & 10 deletions exporter/splunkhecexporter/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func runMetricsExport(disableCompression bool, numberOfDataPoints int, t *testin
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := factory.CreateMetricsExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
defer exporter.Shutdown(context.Background())

md := createMetricsData(numberOfDataPoints)

Expand Down Expand Up @@ -210,6 +212,8 @@ func runTraceExport(disableCompression bool, numberOfTraces int, t *testing.T) (
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
defer exporter.Shutdown(context.Background())

td := createTraceData(numberOfTraces)

Expand Down Expand Up @@ -246,6 +250,8 @@ func runLogExport(disableCompression bool, numberOfLogs int, t *testing.T) (stri
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := factory.CreateLogsExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
defer exporter.Shutdown(context.Background())

ld := createLogData(numberOfLogs)

Expand Down Expand Up @@ -330,14 +336,19 @@ func TestErrorReceived(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Endpoint = "http://" + listener.Addr().String() + "/services/collector"
// Disable QueueSettings to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.DisableCompression = true
cfg.Token = "1234-1234"

params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg)
assert.NoError(t, err)

assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
defer exporter.Shutdown(context.Background())

td := createTraceData(3)

Expand Down Expand Up @@ -368,11 +379,18 @@ func TestInvalidMetrics(t *testing.T) {
func TestInvalidURL(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
// Disable queuing to ensure that we execute the request when calling ConsumeTraces
// otherwise we will not see the error.
cfg.QueueSettings.Enabled = false
// Disable retries to not wait too much time for the return error.
cfg.RetrySettings.Enabled = false
cfg.Endpoint = "ftp://example.com:134"
cfg.Token = "1234-1234"
params := component.ExporterCreateParams{Logger: zap.NewNop()}
exporter, err := factory.CreateTraceExporter(context.Background(), params, cfg)
assert.NoError(t, err)
assert.NoError(t, exporter.Start(context.Background(), componenttest.NewNopHost()))
defer exporter.Shutdown(context.Background())

td := createTraceData(2)

Expand Down Expand Up @@ -417,17 +435,25 @@ func TestInvalidJsonClient(t *testing.T) {
},
nil,
}
c := client{url: nil, zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}}, config: &Config{Timeout: time.Microsecond}}
err := c.sendSplunkEvents(evs)
c := client{
url: nil,
zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
config: &Config{},
}
err := c.sendSplunkEvents(context.Background(), evs)
assert.EqualError(t, err, "Permanent error: json: unsupported value: +Inf")
}

func TestInvalidURLClient(t *testing.T) {
c := client{url: &url.URL{Host: "in va lid"}, zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}}, config: &Config{Timeout: time.Microsecond}}
err := c.sendSplunkEvents([]*splunkEvent{})
c := client{
url: &url.URL{Host: "in va lid"},
zippers: sync.Pool{New: func() interface{} {
return gzip.NewWriter(nil)
}},
config: &Config{},
}
err := c.sendSplunkEvents(context.Background(), []*splunkEvent{})
assert.EqualError(t, err, "Permanent error: parse \"//in%20va%20lid\": invalid URL escape \"%20\"")
}
11 changes: 5 additions & 6 deletions exporter/splunkhecexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ import (
"fmt"
"net/url"
"path"
"time"

"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/exporter/exporterhelper"
)

const (
Expand All @@ -31,7 +31,10 @@ const (

// Config defines configuration for Splunk exporter.
type Config struct {
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
configmodels.ExporterSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.TimeoutSettings `mapstructure:",squash"` // squash ensures fields are correctly decoded in embedded struct.
exporterhelper.QueueSettings `mapstructure:"sending_queue"`
exporterhelper.RetrySettings `mapstructure:"retry_on_failure"`

// HEC Token is the authentication token provided by Splunk.
Token string `mapstructure:"token"`
Expand All @@ -55,10 +58,6 @@ type Config struct {
// Disable GZip compression. Defaults to false.
DisableCompression bool `mapstructure:"disable_compression"`

// Timeout is the maximum timeout for HTTP request sending trace data. The
// default value is 10 seconds.
Timeout time.Duration `mapstructure:"timeout"`

// insecure_skip_verify skips checking the certificate of the HEC endpoint when sending data over HTTPS. Defaults to false.
InsecureSkipVerify bool `mapstructure:"insecure_skip_verify"`
}
Expand Down
16 changes: 15 additions & 1 deletion exporter/splunkhecexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configmodels"
"go.opentelemetry.io/collector/config/configtest"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -63,7 +64,20 @@ func TestLoadConfig(t *testing.T) {
SourceType: "otel",
Index: "metrics",
MaxConnections: 100,
Timeout: 10 * time.Second,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: 10 * time.Second,
},
RetrySettings: exporterhelper.RetrySettings{
Enabled: true,
InitialInterval: 10 * time.Second,
MaxInterval: 1 * time.Minute,
MaxElapsedTime: 10 * time.Minute,
},
QueueSettings: exporterhelper.QueueSettings{
Enabled: true,
NumConsumers: 2,
QueueSize: 10,
},
}
assert.Equal(t, &expectedCfg, e1)

Expand Down
37 changes: 0 additions & 37 deletions exporter/splunkhecexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/obsreport"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -111,39 +110,3 @@ func buildClient(options *exporterOptions, config *Config, logger *zap.Logger) *
config: config,
}
}

func (se splunkExporter) Start(ctxt context.Context, host component.Host) error {
return se.start(ctxt, host)
}

func (se splunkExporter) Shutdown(ctxt context.Context) error {
return se.stop(ctxt)
}

func (se splunkExporter) ConsumeMetrics(ctx context.Context, md pdata.Metrics) error {
ctx = obsreport.StartMetricsExportOp(ctx, typeStr)
numDroppedTimeSeries, err := se.pushMetricsData(ctx, md)

numReceivedTimeSeries, numPoints := md.MetricAndDataPointCount()

obsreport.EndMetricsExportOp(ctx, numPoints, numReceivedTimeSeries, numDroppedTimeSeries, err)
return err
}

func (se splunkExporter) ConsumeTraces(ctx context.Context, td pdata.Traces) error {
ctx = obsreport.StartTraceDataExportOp(ctx, typeStr)

numDroppedSpans, err := se.pushTraceData(ctx, td)

obsreport.EndTraceDataExportOp(ctx, td.SpanCount(), numDroppedSpans, err)
return err
}

func (se splunkExporter) ConsumeLogs(ctx context.Context, ld pdata.Logs) error {
ctx = obsreport.StartLogsExportOp(ctx, typeStr)

numDroppedLogs, err := se.pushLogData(ctx, ld)

obsreport.EndLogsExportOp(ctx, ld.LogRecordCount(), numDroppedLogs, err)
return err
}
10 changes: 5 additions & 5 deletions exporter/splunkhecexporter/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer/consumerdata"
"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/testutil/metricstestutil"
"go.opentelemetry.io/collector/translator/conventions"
"go.opentelemetry.io/collector/translator/internaldata"
Expand All @@ -48,9 +49,9 @@ func TestNew(t *testing.T) {
assert.Nil(t, got)

config := &Config{
Token: "someToken",
Endpoint: "https://example.com:8088",
Timeout: 1 * time.Second,
Token: "someToken",
Endpoint: "https://example.com:8088",
TimeoutSettings: exporterhelper.TimeoutSettings{Timeout: 1 * time.Second},
}
got, err = createExporter(config, zap.NewNop())
assert.NoError(t, err)
Expand Down Expand Up @@ -320,6 +321,5 @@ func TestExporterStartAlwaysReturnsNil(t *testing.T) {
}
e, err := createExporter(config, zap.NewNop())
assert.NoError(t, err)
err = e.Start(context.Background(), componenttest.NewNopHost())
assert.NoError(t, err)
assert.NoError(t, e.start(context.Background(), componenttest.NewNopHost()))
}
43 changes: 37 additions & 6 deletions exporter/splunkhecexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ func createDefaultConfig() configmodels.Exporter {
TypeVal: configmodels.Type(typeStr),
NameVal: typeStr,
},
Timeout: defaultHTTPTimeout,
TimeoutSettings: exporterhelper.TimeoutSettings{
Timeout: defaultHTTPTimeout,
},
RetrySettings: exporterhelper.CreateDefaultRetrySettings(),
QueueSettings: exporterhelper.CreateDefaultQueueSettings(),
DisableCompression: false,
MaxConnections: defaultMaxIdleCons,
}
Expand All @@ -64,12 +68,19 @@ func createTraceExporter(
expCfg := config.(*Config)

exp, err := createExporter(expCfg, params.Logger)

if err != nil {
return nil, err
}

return exp, nil
return exporterhelper.NewTraceExporter(
expCfg,
exp.pushTraceData,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(expCfg.RetrySettings),
exporterhelper.WithQueue(expCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.stop))
}

func createMetricsExporter(
Expand All @@ -88,10 +99,22 @@ func createMetricsExporter(
return nil, err
}

return exp, nil
return exporterhelper.NewMetricsExporter(
expCfg,
exp.pushMetricsData,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(expCfg.RetrySettings),
exporterhelper.WithQueue(expCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.stop))
}

func createLogsExporter(ctx context.Context, params component.ExporterCreateParams, config configmodels.Exporter) (exporter component.LogsExporter, err error) {
func createLogsExporter(
_ context.Context,
params component.ExporterCreateParams,
config configmodels.Exporter,
) (exporter component.LogsExporter, err error) {
if config == nil {
return nil, errors.New("nil config")
}
Expand All @@ -103,5 +126,13 @@ func createLogsExporter(ctx context.Context, params component.ExporterCreatePara
return nil, err
}

return exp, nil
return exporterhelper.NewLogsExporter(
expCfg,
exp.pushLogData,
// explicitly disable since we rely on http.Client timeout logic.
exporterhelper.WithTimeout(exporterhelper.TimeoutSettings{Timeout: 0}),
exporterhelper.WithRetry(expCfg.RetrySettings),
exporterhelper.WithQueue(expCfg.QueueSettings),
exporterhelper.WithStart(exp.start),
exporterhelper.WithShutdown(exp.stop))
}
10 changes: 10 additions & 0 deletions exporter/splunkhecexporter/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,16 @@ exporters:
source: "otel"
sourcetype: "otel"
index: "metrics"
timeout: 10s
sending_queue:
enabled: true
num_consumers: 2
queue_size: 10
retry_on_failure:
enabled: true
initial_interval: 10s
max_interval: 60s
max_elapsed_time: 10m

service:
pipelines:
Expand Down

0 comments on commit 482d718

Please sign in to comment.