Skip to content

Commit

Permalink
[exporterhelper] New exporter helper for custom requests (#8178)
Browse files Browse the repository at this point in the history
Introduce a new exporter helper that operates over client-provided
requests instead of pdata. The helper user now has to provide
`Converter` - an interface with a function implementing translation of
pdata Metrics/Traces/Logs into a user-defined `Request`. `Request` is an
interface with only one required function `Export`.

It opens a door for moving batching to the exporter, where batches will
be built from client data format, instead of pdata. The batches can be
properly sized by custom request size, which can be different from OTLP.
The same custom request sizing will be applied to the sending queue. It
will also improve the performance of the sending queue retries for
non-OTLP exporters, they don't need to translate pdata on every retry.

This is an implementation alternative to
#7874 as
suggested in
#7874 (comment)

Tracking Issue:
#8122

---------

Co-authored-by: Alex Boten <[email protected]>
  • Loading branch information
dmitryax and codeboten authored Aug 21, 2023
1 parent af645d2 commit a2242f7
Show file tree
Hide file tree
Showing 20 changed files with 691 additions and 114 deletions.
12 changes: 12 additions & 0 deletions .chloggen/exporter-helper-v2.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporter/exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Introduce a new exporter helper that operates over client-provided requests instead of pdata

# One or more tracking issues or pull requests related to the change
issues: [7874]

42 changes: 30 additions & 12 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,46 @@ func (req *baseRequest) OnProcessingFinished() {
}
}

type queueSettings struct {
config QueueSettings
marshaler internal.RequestMarshaler
unmarshaler internal.RequestUnmarshaler
}

func (qs *queueSettings) persistenceEnabled() bool {
return qs.config.StorageID != nil && qs.marshaler != nil && qs.unmarshaler != nil
}

// baseSettings represents all the options that users can configure.
type baseSettings struct {
component.StartFunc
component.ShutdownFunc
consumerOptions []consumer.Option
TimeoutSettings
QueueSettings
queueSettings
RetrySettings
requestExporter bool
}

// fromOptions returns the internal options starting from the default and applying all configured options.
func fromOptions(options ...Option) *baseSettings {
// Start from the default options:
opts := &baseSettings{
// newBaseSettings returns the baseSettings starting from the default and applying all configured options.
// requestExporter indicates whether the base settings are for a new request exporter or not.
func newBaseSettings(requestExporter bool, options ...Option) *baseSettings {
bs := &baseSettings{
requestExporter: requestExporter,
TimeoutSettings: NewDefaultTimeoutSettings(),
// TODO: Enable queuing by default (call DefaultQueueSettings)
QueueSettings: QueueSettings{Enabled: false},
queueSettings: queueSettings{
config: QueueSettings{Enabled: false},
},
// TODO: Enable retry by default (call DefaultRetrySettings)
RetrySettings: RetrySettings{Enabled: false},
}

for _, op := range options {
op(opts)
op(bs)
}

return opts
return bs
}

// Option apply changes to baseSettings.
Expand Down Expand Up @@ -121,9 +135,13 @@ func WithRetry(retrySettings RetrySettings) Option {

// WithQueue overrides the default QueueSettings for an exporter.
// The default QueueSettings is to disable queueing.
func WithQueue(queueSettings QueueSettings) Option {
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
return func(o *baseSettings) {
o.QueueSettings = queueSettings
if o.requestExporter {
panic("queueing is not available for the new request exporters yet")
}
o.queueSettings.config = config
}
}

Expand All @@ -145,7 +163,7 @@ type baseExporter struct {
qrSender *queuedRetrySender
}

func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType, reqUnmarshaler internal.RequestUnmarshaler) (*baseExporter, error) {
func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal component.DataType) (*baseExporter, error) {
be := &baseExporter{}

var err error
Expand All @@ -154,7 +172,7 @@ func newBaseExporter(set exporter.CreateSettings, bs *baseSettings, signal compo
return nil, err
}

be.qrSender = newQueuedRetrySender(set.ID, signal, bs.QueueSettings, bs.RetrySettings, reqUnmarshaler, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.qrSender = newQueuedRetrySender(set.ID, signal, bs.queueSettings, bs.RetrySettings, &timeoutSender{cfg: bs.TimeoutSettings}, set.Logger)
be.sender = be.qrSender
be.StartFunc = func(ctx context.Context, host component.Host) error {
// First start the wrapped exporter.
Expand Down
23 changes: 7 additions & 16 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal"
"go.opentelemetry.io/collector/exporter/exportertest"
"go.opentelemetry.io/collector/pdata/ptrace"
)

var (
Expand All @@ -35,7 +32,11 @@ var (
)

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, fromOptions(), "", nopRequestUnmarshaler())
be, err := newBaseExporter(defaultSettings, newBaseSettings(false), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, newBaseSettings(true), "")
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -45,12 +46,12 @@ func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings,
fromOptions(
newBaseSettings(
false,
WithStart(func(ctx context.Context, host component.Host) error { return want }),
WithShutdown(func(ctx context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings())),
"",
nopRequestUnmarshaler(),
)
require.NoError(t, err)
require.Equal(t, want, be.Start(context.Background(), componenttest.NewNopHost()))
Expand All @@ -65,13 +66,3 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
require.Equal(t, codes.Unset, sd.Status().Code, "SpanData %v", sd)
}
}

func nopTracePusher() consumer.ConsumeTracesFunc {
return func(ctx context.Context, ld ptrace.Traces) error {
return nil
}
}

func nopRequestUnmarshaler() internal.RequestUnmarshaler {
return newTraceRequestUnmarshalerFunc(nopTracePusher())
}
6 changes: 6 additions & 0 deletions exporter/exporterhelper/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,10 @@ var (
errNilPushMetricsData = errors.New("nil PushMetrics")
// errNilPushLogsData is returned when a nil PushLogs is given.
errNilPushLogsData = errors.New("nil PushLogs")
// errNilTracesConverter is returned when a nil TracesConverter is given.
errNilTracesConverter = errors.New("nil TracesConverter")
// errNilMetricsConverter is returned when a nil MetricsConverter is given.
errNilMetricsConverter = errors.New("nil MetricsConverter")
// errNilLogsConverter is returned when a nil LogsConverter is given.
errNilLogsConverter = errors.New("nil LogsConverter")
)
14 changes: 12 additions & 2 deletions exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,21 @@ func buildPersistentStorageName(name string, signal component.DataType) string {
return fmt.Sprintf("%s-%s", name, signal)
}

type PersistentQueueSettings struct {
Name string
Signal component.DataType
Capacity uint64
Logger *zap.Logger
Client storage.Client
Unmarshaler RequestUnmarshaler
Marshaler RequestMarshaler
}

// NewPersistentQueue creates a new queue backed by file storage; name and signal must be a unique combination that identifies the queue storage
func NewPersistentQueue(ctx context.Context, name string, signal component.DataType, capacity int, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) ProducerConsumerQueue {
func NewPersistentQueue(ctx context.Context, params PersistentQueueSettings) ProducerConsumerQueue {
return &persistentQueue{
stopChan: make(chan struct{}),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(name, signal), uint64(capacity), logger, client, unmarshaler),
storage: newPersistentContiguousStorage(ctx, buildPersistentStorageName(params.Name, params.Signal), params),
}
}

Expand Down
10 changes: 9 additions & 1 deletion exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,15 @@ func createTestQueue(extension storage.Extension, capacity int) *persistentQueue
panic(err)
}

wq := NewPersistentQueue(context.Background(), "foo", component.DataTypeTraces, capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
wq := NewPersistentQueue(context.Background(), PersistentQueueSettings{
Name: "foo",
Signal: component.DataTypeTraces,
Capacity: uint64(capacity),
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
return wq.(*persistentQueue)
}

Expand Down
14 changes: 8 additions & 6 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type persistentContiguousStorage struct {
queueName string
client storage.Client
unmarshaler RequestUnmarshaler
marshaler RequestMarshaler

putChan chan struct{}
stopChan chan struct{}
Expand Down Expand Up @@ -80,14 +81,15 @@ var (

// newPersistentContiguousStorage creates a new file-storage extension backed queue;
// queueName parameter must be a unique value that identifies the queue.
func newPersistentContiguousStorage(ctx context.Context, queueName string, capacity uint64, logger *zap.Logger, client storage.Client, unmarshaler RequestUnmarshaler) *persistentContiguousStorage {
func newPersistentContiguousStorage(ctx context.Context, queueName string, set PersistentQueueSettings) *persistentContiguousStorage {
pcs := &persistentContiguousStorage{
logger: logger,
client: client,
logger: set.Logger,
client: set.Client,
queueName: queueName,
unmarshaler: unmarshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
unmarshaler: set.Unmarshaler,
marshaler: set.Marshaler,
capacity: set.Capacity,
putChan: make(chan struct{}, set.Capacity),
reqChan: make(chan Request),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error)

// setRequest adds Set operation over a given request to the batch
func (bof *batchStruct) setRequest(key string, value Request) *batchStruct {
return bof.set(key, value, requestToBytes)
return bof.set(key, value, bof.requestToBytes)
}

// setItemIndex adds Set operation over a given itemIndex to the batch
Expand Down Expand Up @@ -206,8 +206,8 @@ func bytesToItemIndexArray(b []byte) (any, error) {
return val, err
}

func requestToBytes(req any) ([]byte, error) {
return req.(Request).Marshal()
func (bof *batchStruct) requestToBytes(req any) ([]byte, error) {
return bof.pcs.marshaler(req.(Request))
}

func (bof *batchStruct) bytesToRequest(b []byte) (any, error) {
Expand Down
15 changes: 14 additions & 1 deletion exporter/exporterhelper/internal/persistent_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,13 @@ func createTestClient(extension storage.Extension) storage.Client {
}

func createTestPersistentStorageWithLoggingAndCapacity(client storage.Client, logger *zap.Logger, capacity uint64) *persistentContiguousStorage {
return newPersistentContiguousStorage(context.Background(), "foo", capacity, logger, client, newFakeTracesRequestUnmarshalerFunc())
return newPersistentContiguousStorage(context.Background(), "foo", PersistentQueueSettings{
Capacity: capacity,
Logger: logger,
Client: client,
Unmarshaler: newFakeTracesRequestUnmarshalerFunc(),
Marshaler: newFakeTracesRequestMarshalerFunc(),
})
}

func createTestPersistentStorage(client storage.Client) *persistentContiguousStorage {
Expand Down Expand Up @@ -82,6 +88,13 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler {
}
}

func newFakeTracesRequestMarshalerFunc() RequestMarshaler {
return func(req Request) ([]byte, error) {
marshaler := ptrace.ProtoMarshaler{}
return marshaler.MarshalTraces(req.(*fakeTracesRequest).td)
}
}

func TestPersistentStorage_CorruptedData(t *testing.T) {
path := t.TempDir()

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ type Request interface {
// Count returns the count of spans/metric points or log records.
Count() int

// Marshal serializes the current request into a byte stream
Marshal() ([]byte, error)

// OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished
OnProcessingFinished()

Expand All @@ -34,3 +31,6 @@ type Request interface {

// RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request
type RequestUnmarshaler func([]byte) (Request, error)

// RequestMarshaler defines a function which takes a request and marshals it into a byte slice
type RequestMarshaler func(Request) ([]byte, error)
Loading

0 comments on commit a2242f7

Please sign in to comment.