Skip to content

Commit

Permalink
[exporterhelper] Add observability to the new exporter helper (#8244)
Browse files Browse the repository at this point in the history
This change adds collector's internal metrics and tracing to the new
request-based exporter helpers. Only those metrics and traces are added
that are already adopted by the existing exporter helpers for backward
compatibility. The new exporter helpers can and should expose more
metrics in the future, e.g. for tracking converter errors.

Tracking Issue:
#8122
  • Loading branch information
dmitryax authored Aug 21, 2023
1 parent 76e0540 commit 3dad181
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 12 deletions.
17 changes: 13 additions & 4 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ func NewLogsRequestExporter(
if err != nil {
return nil, err
}

// TODO: Add new observability tracing/metrics to the new exporterhelper.
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &logsExporterWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req, cErr := converter.RequestFromLogs(ctx, ld)
Expand All @@ -160,10 +164,15 @@ func NewLogsRequestExporter(
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
return be.sender.send(&request{
r := &request{
baseRequest: baseRequest{ctx: ctx},
Request: req,
})
}
sErr := be.sender.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count()))
}
return sErr
}, bs.consumerOptions...)

return &logsExporter{
Expand Down
53 changes: 53 additions & 0 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,18 @@ func TestLogsExporter_WithRecordMetrics(t *testing.T) {
checkRecordedMetricsForLogsExporter(t, tt, le, nil)
}

func TestLogsRequestExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, tt, le, nil)
}

func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
Expand All @@ -173,6 +185,20 @@ func TestLogsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForLogsExporter(t, tt, le, want)
}

func TestLogsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) {
want := errors.New("export_error")
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

le, err := NewLogsRequestExporter(context.Background(), tt.ToExporterCreateSettings(),
&fakeRequestConverter{requestError: want})
require.Nil(t, err)
require.NotNil(t, le)

checkRecordedMetricsForLogsExporter(t, tt, le, want)
}

func TestLogsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeLogsExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -211,6 +237,19 @@ func TestLogsExporter_WithSpan(t *testing.T) {
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1)
}

func TestLogsRequestExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{})
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil, 1)
}

func TestLogsExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -225,6 +264,20 @@ func TestLogsExporter_WithSpan_ReturnError(t *testing.T) {
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1)
}

func TestLogsRequestExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
le, err := NewLogsRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want})
require.Nil(t, err)
require.NotNil(t, le)
checkWrapSpanForLogsExporter(t, sr, set.TracerProvider.Tracer("test"), le, want, 1)
}

func TestLogsExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
Expand Down
17 changes: 13 additions & 4 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ func NewMetricsRequestExporter(
if err != nil {
return nil, err
}

// TODO: Add new observability tracing/metrics to the new exporterhelper.
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &metricsSenderWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req, cErr := converter.RequestFromMetrics(ctx, md)
Expand All @@ -160,10 +164,15 @@ func NewMetricsRequestExporter(
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
return be.sender.send(&request{
r := &request{
Request: req,
baseRequest: baseRequest{ctx: ctx},
})
}
sErr := be.sender.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count()))
}
return sErr
}, bs.consumerOptions...)

return &metricsExporter{
Expand Down
52 changes: 52 additions & 0 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,18 @@ func TestMetricsExporter_WithRecordMetrics(t *testing.T) {
checkRecordedMetricsForMetricsExporter(t, tt, me, nil)
}

func TestMetricsRequestExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, tt, me, nil)
}

func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
Expand All @@ -174,6 +186,19 @@ func TestMetricsExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForMetricsExporter(t, tt, me, want)
}

func TestMetricsRequestExporter_WithRecordMetrics_ExportError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

me, err := NewMetricsRequestExporter(context.Background(), tt.ToExporterCreateSettings(), fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, me)

checkRecordedMetricsForMetricsExporter(t, tt, me, want)
}

func TestMetricsExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeMetricsExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -212,6 +237,19 @@ func TestMetricsExporter_WithSpan(t *testing.T) {
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2)
}

func TestMetricsRequestExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, nil, 2)
}

func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -226,6 +264,20 @@ func TestMetricsExporter_WithSpan_ReturnError(t *testing.T) {
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2)
}

func TestMetricsRequestExporter_WithSpan_ExportError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("my_error")
me, err := NewMetricsRequestExporter(context.Background(), set, fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, me)
checkWrapSpanForMetricsExporter(t, sr, set.TracerProvider.Tracer("test"), me, want, 2)
}

func TestMetricsExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
Expand Down
17 changes: 13 additions & 4 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,12 @@ func NewTracesRequestExporter(
if err != nil {
return nil, err
}

// TODO: Add new observability tracing/metrics to the new exporterhelper.
be.wrapConsumerSender(func(nextSender requestSender) requestSender {
return &tracesExporterWithObservability{
obsrep: be.obsrep,
nextSender: nextSender,
}
})

tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
req, cErr := converter.RequestFromTraces(ctx, td)
Expand All @@ -160,10 +164,15 @@ func NewTracesRequestExporter(
zap.Error(err))
return consumererror.NewPermanent(cErr)
}
return be.sender.send(&request{
r := &request{
baseRequest: baseRequest{ctx: ctx},
Request: req,
})
}
sErr := be.sender.send(r)
if errors.Is(sErr, errSendingQueueIsFull) {
be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count()))
}
return sErr
}, bs.consumerOptions...)

return &traceExporter{
Expand Down
54 changes: 54 additions & 0 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,18 @@ func TestTracesExporter_WithRecordMetrics(t *testing.T) {
checkRecordedMetricsForTracesExporter(t, tt, te, nil)
}

func TestTracesRequestExporter_WithRecordMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, te)

checkRecordedMetricsForTracesExporter(t, tt, te, nil)
}

func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) {
want := errors.New("my_error")
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
Expand All @@ -171,6 +183,19 @@ func TestTracesExporter_WithRecordMetrics_ReturnError(t *testing.T) {
checkRecordedMetricsForTracesExporter(t, tt, te, want)
}

func TestTracesRequestExporter_WithRecordMetrics_RequestSenderError(t *testing.T) {
want := errors.New("export_error")
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := NewTracesRequestExporter(context.Background(), tt.ToExporterCreateSettings(), &fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, te)

checkRecordedMetricsForTracesExporter(t, tt, te, want)
}

func TestTracesExporter_WithRecordEnqueueFailedMetrics(t *testing.T) {
tt, err := obsreporttest.SetupTelemetry(fakeTracesExporterName)
require.NoError(t, err)
Expand Down Expand Up @@ -210,6 +235,20 @@ func TestTracesExporter_WithSpan(t *testing.T) {
checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1)
}

func TestTracesRequestExporter_WithSpan(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{})
require.NoError(t, err)
require.NotNil(t, te)

checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, nil, 1)
}

func TestTracesExporter_WithSpan_ReturnError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
Expand All @@ -225,6 +264,21 @@ func TestTracesExporter_WithSpan_ReturnError(t *testing.T) {
checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1)
}

func TestTracesRequestExporter_WithSpan_ExportError(t *testing.T) {
set := exportertest.NewNopCreateSettings()
sr := new(tracetest.SpanRecorder)
set.TracerProvider = sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(sr))
otel.SetTracerProvider(set.TracerProvider)
defer otel.SetTracerProvider(trace.NewNoopTracerProvider())

want := errors.New("export_error")
te, err := NewTracesRequestExporter(context.Background(), set, &fakeRequestConverter{requestError: want})
require.NoError(t, err)
require.NotNil(t, te)

checkWrapSpanForTracesExporter(t, sr, set.TracerProvider.Tracer("test"), te, want, 1)
}

func TestTracesExporter_WithShutdown(t *testing.T) {
shutdownCalled := false
shutdown := func(context.Context) error { shutdownCalled = true; return nil }
Expand Down

0 comments on commit 3dad181

Please sign in to comment.