From 26e138faa3920162625023a20b907b9c0d8fa2a5 Mon Sep 17 00:00:00 2001 From: Dmitry Anoshin Date: Tue, 12 Sep 2023 20:51:16 -0700 Subject: [PATCH] [exporterhelper] Convert internal request interface into a struct At this point we have two interfaces: public and internal. Each interface have their own implementation which makes it hard to read. This change replaces the internal interface with a struct embedding the public interface. This requires moving the public interface to a separate package which makes the module structure cleaner anyway --- .../exporter-helper-add-request-onerror.yaml | 18 ++++++ .chloggen/exporter-helper-move-request.yaml | 18 ++++++ exporter/exporterhelper/common.go | 30 +-------- .../internal/bounded_memory_queue.go | 6 +- .../internal/bounded_memory_queue_test.go | 25 ++++---- .../internal/persistent_queue.go | 2 +- .../internal/persistent_queue_test.go | 10 +-- .../internal/persistent_storage.go | 24 ++++---- .../internal/persistent_storage_batch.go | 8 +-- .../internal/persistent_storage_test.go | 36 +++++------ .../internal/producer_consumer_queue.go | 4 +- exporter/exporterhelper/internal/request.go | 60 ++++++++++++------ exporter/exporterhelper/logs.go | 34 ++++++----- exporter/exporterhelper/metrics.go | 34 ++++++----- exporter/exporterhelper/queue_sender.go | 6 +- exporter/exporterhelper/queue_sender_test.go | 17 +++--- exporter/exporterhelper/request.go | 61 ------------------- exporter/exporterhelper/request/request.go | 51 ++++++++++++++++ exporter/exporterhelper/request_test.go | 7 ++- exporter/exporterhelper/retry_sender.go | 6 +- exporter/exporterhelper/retry_sender_test.go | 56 +++++++---------- exporter/exporterhelper/timeout_sender.go | 2 +- exporter/exporterhelper/traces.go | 34 ++++++----- 23 files changed, 284 insertions(+), 265 deletions(-) create mode 100755 .chloggen/exporter-helper-add-request-onerror.yaml create mode 100755 .chloggen/exporter-helper-move-request.yaml delete mode 100644 exporter/exporterhelper/request.go create mode 100644 exporter/exporterhelper/request/request.go diff --git a/.chloggen/exporter-helper-add-request-onerror.yaml b/.chloggen/exporter-helper-add-request-onerror.yaml new file mode 100755 index 00000000000..c0a706c6560 --- /dev/null +++ b/.chloggen/exporter-helper-add-request-onerror.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhgelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add new optional interface `request.ErrorHandler` to the helper. + +# One or more tracking issues or pull requests related to the change +issues: [8435] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/exporter-helper-move-request.yaml b/.chloggen/exporter-helper-move-request.yaml new file mode 100755 index 00000000000..6f6ba816051 --- /dev/null +++ b/.chloggen/exporter-helper-move-request.yaml @@ -0,0 +1,18 @@ +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhgelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Move request related API to a separate package + +# One or more tracking issues or pull requests related to the change +issues: [8435] + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index f6277404619..741de099ecb 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -21,7 +21,7 @@ import ( type requestSender interface { start(ctx context.Context, host component.Host, set exporter.CreateSettings) error shutdown() - send(req internal.Request) error + send(req *internal.Request) error setNextSender(nextSender requestSender) } @@ -37,7 +37,7 @@ func (b *baseRequestSender) start(context.Context, component.Host, exporter.Crea func (b *baseRequestSender) shutdown() {} -func (b *baseRequestSender) send(req internal.Request) error { +func (b *baseRequestSender) send(req *internal.Request) error { return b.nextSender.send(req) } @@ -47,30 +47,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) { type obsrepSenderFactory func(obsrep *obsExporter) requestSender -// baseRequest is a base implementation for the internal.Request. -type baseRequest struct { - ctx context.Context - processingFinishedCallback func() -} - -func (req *baseRequest) Context() context.Context { - return req.ctx -} - -func (req *baseRequest) SetContext(ctx context.Context) { - req.ctx = ctx -} - -func (req *baseRequest) SetOnProcessingFinished(callback func()) { - req.processingFinishedCallback = callback -} - -func (req *baseRequest) OnProcessingFinished() { - if req.processingFinishedCallback != nil { - req.processingFinishedCallback() - } -} - // Option apply changes to baseExporter. type Option func(*baseExporter) @@ -199,7 +175,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req } // send sends the request using the first sender in the chain. -func (be *baseExporter) send(req internal.Request) error { +func (be *baseExporter) send(req *internal.Request) error { return be.queueSender.send(req) } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue.go b/exporter/exporterhelper/internal/bounded_memory_queue.go index c7f8655338a..f3ab05cabdc 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue.go @@ -20,7 +20,7 @@ type boundedMemoryQueue struct { stopWG sync.WaitGroup size *atomic.Uint32 stopped *atomic.Bool - items chan Request + items chan *Request capacity uint32 numConsumers int } @@ -29,7 +29,7 @@ type boundedMemoryQueue struct { // callback for dropped items (e.g. useful to emit metrics). func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue { return &boundedMemoryQueue{ - items: make(chan Request, capacity), + items: make(chan *Request, capacity), stopped: &atomic.Bool{}, size: &atomic.Uint32{}, capacity: uint32(capacity), @@ -58,7 +58,7 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu } // Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow. -func (q *boundedMemoryQueue) Produce(item Request) bool { +func (q *boundedMemoryQueue) Produce(item *Request) bool { if q.stopped.Load() { return false } diff --git a/exporter/exporterhelper/internal/bounded_memory_queue_test.go b/exporter/exporterhelper/internal/bounded_memory_queue_test.go index 9fe809cf2a2..62208faae02 100644 --- a/exporter/exporterhelper/internal/bounded_memory_queue_test.go +++ b/exporter/exporterhelper/internal/bounded_memory_queue_test.go @@ -21,7 +21,7 @@ import ( "go.opentelemetry.io/collector/exporter/exportertest" ) -func newNopQueueSettings(callback func(item Request)) QueueSettings { +func newNopQueueSettings(callback func(item *Request)) QueueSettings { return QueueSettings{ CreateSettings: exportertest.NewNopCreateSettings(), DataType: component.DataTypeMetrics, @@ -30,18 +30,21 @@ func newNopQueueSettings(callback func(item Request)) QueueSettings { } type stringRequest struct { - Request str string } -func newStringRequest(str string) Request { - return stringRequest{str: str} +func (s stringRequest) Export(_ context.Context) error { + return nil +} + +func newStringRequest(str string) *Request { + return NewRequest(context.Background(), stringRequest{str: str}) } // In this test we run a queue with capacity 1 and a single consumer. // We want to test the overflow behavior, so we block the consumer // by holding a startLock before submitting items to the queue. -func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) { +func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item *Request))) { q := NewBoundedMemoryQueue(1, 1) var startLock sync.Mutex @@ -49,8 +52,8 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF startLock.Lock() // block consumers consumerState := newConsumerState(t) - startConsumers(q, func(item Request) { - consumerState.record(item.(stringRequest).str) + startConsumers(q, func(item *Request) { + consumerState.record(item.Request.(stringRequest).str) // block further processing until startLock is released startLock.Lock() @@ -100,7 +103,7 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF } func TestBoundedQueue(t *testing.T) { - helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) { + helper(t, func(q ProducerConsumerQueue, consumerFn func(item *Request)) { assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn))) }) } @@ -116,8 +119,8 @@ func TestShutdownWhileNotEmpty(t *testing.T) { consumerState := newConsumerState(t) - assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) { - consumerState.record(item.(stringRequest).str) + assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item *Request) { + consumerState.record(item.Request.(stringRequest).str) time.Sleep(1 * time.Second) }))) @@ -198,7 +201,7 @@ func (s *consumerState) assertConsumed(expected map[string]bool) { func TestZeroSize(t *testing.T) { q := NewBoundedMemoryQueue(0, 1) - err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {})) + err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item *Request) {})) assert.NoError(t, err) assert.False(t, q.Produce(newStringRequest("a"))) // in process diff --git a/exporter/exporterhelper/internal/persistent_queue.go b/exporter/exporterhelper/internal/persistent_queue.go index ba1dcc67230..452a5009e45 100644 --- a/exporter/exporterhelper/internal/persistent_queue.go +++ b/exporter/exporterhelper/internal/persistent_queue.go @@ -80,7 +80,7 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q } // Produce adds an item to the queue and returns true if it was accepted -func (pq *persistentQueue) Produce(item Request) bool { +func (pq *persistentQueue) Produce(item *Request) bool { err := pq.storage.put(item) return err == nil } diff --git a/exporter/exporterhelper/internal/persistent_queue_test.go b/exporter/exporterhelper/internal/persistent_queue_test.go index 08570e92342..6eabe4262eb 100644 --- a/exporter/exporterhelper/internal/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/persistent_queue_test.go @@ -32,7 +32,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component { } // createTestQueue creates and starts a fake queue with the given capacity and number of consumers. -func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) ProducerConsumerQueue { +func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item *Request)) ProducerConsumerQueue { pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(), newFakeTracesRequestUnmarshalerFunc()) host := &mockHost{ext: map[component.ID]component.Component{ @@ -51,7 +51,7 @@ func TestPersistentQueue_Capacity(t *testing.T) { host := &mockHost{ext: map[component.ID]component.Component{ {}: NewMockStorageExtension(nil), }} - err := pq.Start(context.Background(), host, newNopQueueSettings(func(req Request) {})) + err := pq.Start(context.Background(), host, newNopQueueSettings(func(req *Request) {})) require.NoError(t, err) // Stop consumer to imitate queue overflow @@ -85,7 +85,7 @@ func TestPersistentQueue_Capacity(t *testing.T) { } func TestPersistentQueue_Close(t *testing.T) { - wq := createTestQueue(t, 1001, 100, func(item Request) {}) + wq := createTestQueue(t, 1001, 100, func(item *Request) {}) traces := newTraces(1, 10) req := newFakeTracesRequest(traces) @@ -104,7 +104,7 @@ func TestPersistentQueue_Close(t *testing.T) { // Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) { - wq := createTestQueue(t, 1001, 1, func(item Request) {}) + wq := createTestQueue(t, 1001, 1, func(item *Request) {}) traces := newTraces(1, 10) lastRequestProcessedTime := time.Now() @@ -163,7 +163,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) { req := newFakeTracesRequest(traces) numMessagesConsumed := &atomic.Int32{} - tq := createTestQueue(t, 1000, c.numConsumers, func(item Request) { + tq := createTestQueue(t, 1000, c.numConsumers, func(item *Request) { if item != nil { numMessagesConsumed.Add(int32(1)) } diff --git a/exporter/exporterhelper/internal/persistent_storage.go b/exporter/exporterhelper/internal/persistent_storage.go index 49f204bbb0d..00d47f49a28 100644 --- a/exporter/exporterhelper/internal/persistent_storage.go +++ b/exporter/exporterhelper/internal/persistent_storage.go @@ -50,7 +50,7 @@ type persistentContiguousStorage struct { stopOnce sync.Once capacity uint64 - reqChan chan Request + reqChan chan *Request mu sync.Mutex readIndex itemIndex @@ -91,7 +91,7 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, clien marshaler: marshaler, capacity: capacity, putChan: make(chan struct{}, capacity), - reqChan: make(chan Request), + reqChan: make(chan *Request), stopChan: make(chan struct{}), itemsCount: &atomic.Uint64{}, } @@ -145,7 +145,7 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex)) } -func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Request) { +func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []*Request) { if len(reqs) > 0 { errCount := 0 for _, req := range reqs { @@ -183,7 +183,7 @@ func (pcs *persistentContiguousStorage) loop() { } // get returns the request channel that all the requests will be send on -func (pcs *persistentContiguousStorage) get() <-chan Request { +func (pcs *persistentContiguousStorage) get() <-chan *Request { return pcs.reqChan } @@ -203,7 +203,7 @@ func (pcs *persistentContiguousStorage) stop() { } // put marshals the request and puts it into the persistent queue -func (pcs *persistentContiguousStorage) put(req Request) error { +func (pcs *persistentContiguousStorage) put(req *Request) error { // Nil requests are ignored if req == nil { return nil @@ -231,7 +231,7 @@ func (pcs *persistentContiguousStorage) put(req Request) error { } // getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false) -func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Request, bool) { +func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (*Request, bool) { pcs.mu.Lock() defer pcs.mu.Unlock() @@ -244,7 +244,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques pcs.updateReadIndex(ctx) pcs.itemDispatchingStart(ctx, index) - var req Request + var req *Request batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx) if err == nil { req, err = batch.getRequestResult(pcs.itemKey(index)) @@ -261,14 +261,14 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques } // If all went well so far, cleanup will be handled by callback - req.SetOnProcessingFinished(func() { + req.processingFinishedCallback = func() { pcs.mu.Lock() defer pcs.mu.Unlock() if err := pcs.itemDispatchingFinish(ctx, index); err != nil { pcs.logger.Error("Error deleting item from queue", zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err)) } - }) + } return req, true } @@ -278,8 +278,8 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques // retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage // and moves the items back to the queue. The function returns an array which might contain nils // if unmarshalling of the value at a given index was not possible. -func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []Request { - var reqs []Request +func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []*Request { + var reqs []*Request var dispatchedItems []itemIndex pcs.mu.Lock() @@ -302,7 +302,7 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co pcs.logger.Debug("No items left for dispatch by consumers") } - reqs = make([]Request, len(dispatchedItems)) + reqs = make([]*Request, len(dispatchedItems)) keys := make([]string, len(dispatchedItems)) retrieveBatch := newBatch(pcs) cleanupBatch := newBatch(pcs) diff --git a/exporter/exporterhelper/internal/persistent_storage_batch.go b/exporter/exporterhelper/internal/persistent_storage_batch.go index a80ba93c5c3..50a5f34fee8 100644 --- a/exporter/exporterhelper/internal/persistent_storage_batch.go +++ b/exporter/exporterhelper/internal/persistent_storage_batch.go @@ -93,7 +93,7 @@ func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (any, error // getRequestResult returns the result of a Get operation as a request // If the value cannot be retrieved, it returns an error -func (bof *batchStruct) getRequestResult(key string) (Request, error) { +func (bof *batchStruct) getRequestResult(key string) (*Request, error) { reqIf, err := bof.getResult(key, bof.bytesToRequest) if err != nil { return nil, err @@ -102,7 +102,7 @@ func (bof *batchStruct) getRequestResult(key string) (Request, error) { return nil, errValueNotSet } - return reqIf.(Request), nil + return reqIf.(*Request), nil } // getItemIndexResult returns the result of a Get operation as an itemIndex @@ -136,7 +136,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error) } // setRequest adds Set operation over a given request to the batch -func (bof *batchStruct) setRequest(key string, value Request) *batchStruct { +func (bof *batchStruct) setRequest(key string, value *Request) *batchStruct { return bof.set(key, value, bof.requestToBytes) } @@ -207,7 +207,7 @@ func bytesToItemIndexArray(b []byte) (any, error) { } func (bof *batchStruct) requestToBytes(req any) ([]byte, error) { - return bof.pcs.marshaler(req.(Request)) + return bof.pcs.marshaler(req.(*Request)) } func (bof *batchStruct) bytesToRequest(b []byte) (any, error) { diff --git a/exporter/exporterhelper/internal/persistent_storage_test.go b/exporter/exporterhelper/internal/persistent_storage_test.go index d48bbd75285..b8fffd7d824 100644 --- a/exporter/exporterhelper/internal/persistent_storage_test.go +++ b/exporter/exporterhelper/internal/persistent_storage_test.go @@ -41,29 +41,21 @@ func createTestPersistentStorage(client storage.Client) *persistentContiguousSto } type fakeTracesRequest struct { - td ptrace.Traces - processingFinishedCallback func() - Request + td ptrace.Traces } -func newFakeTracesRequest(td ptrace.Traces) *fakeTracesRequest { - return &fakeTracesRequest{ - td: td, - } -} - -func (fd *fakeTracesRequest) OnProcessingFinished() { - if fd.processingFinishedCallback != nil { - fd.processingFinishedCallback() - } +func (ftr *fakeTracesRequest) Export(context.Context) error { + return nil } -func (fd *fakeTracesRequest) SetOnProcessingFinished(callback func()) { - fd.processingFinishedCallback = callback +func newFakeTracesRequest(td ptrace.Traces) *Request { + return NewRequest(context.Background(), &fakeTracesRequest{ + td: td, + }) } func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { - return func(bytes []byte) (Request, error) { + return func(bytes []byte) (*Request, error) { unmarshaler := ptrace.ProtoUnmarshaler{} traces, err := unmarshaler.UnmarshalTraces(bytes) if err != nil { @@ -74,9 +66,9 @@ func newFakeTracesRequestUnmarshalerFunc() RequestUnmarshaler { } func newFakeTracesRequestMarshalerFunc() RequestMarshaler { - return func(req Request) ([]byte, error) { + return func(req *Request) ([]byte, error) { marshaler := ptrace.ProtoMarshaler{} - return marshaler.MarshalTraces(req.(*fakeTracesRequest).td) + return marshaler.MarshalTraces(req.Request.(*fakeTracesRequest).td) } } @@ -212,7 +204,7 @@ func TestPersistentStorage_CurrentlyProcessedItems(t *testing.T) { // Now, this will take item 0 and pull item 1 into the unbuffered channel readReq := <-ps.get() - assert.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + assert.Equal(t, req.Request.(*fakeTracesRequest).td, readReq.Request.(*fakeTracesRequest).td) requireCurrentlyDispatchedItemsEqual(t, ps, []itemIndex{0, 1}) // This takes item 1 from channel and pulls another one (item 2) into the unbuffered channel @@ -326,10 +318,10 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { // Lets read both of the elements we put readReq := <-ps.get() - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + require.Equal(t, req.Request.(*fakeTracesRequest).td, readReq.Request.(*fakeTracesRequest).td) readReq = <-ps.get() - require.Equal(t, req.td, readReq.(*fakeTracesRequest).td) + require.Equal(t, req.Request.(*fakeTracesRequest).td, readReq.Request.(*fakeTracesRequest).td) require.Equal(t, uint64(0), ps.size()) err = ext.Shutdown(context.Background()) @@ -338,7 +330,7 @@ func TestPersistentStorage_RepeatPutCloseReadClose(t *testing.T) { // No more items ext := NewMockStorageExtension(nil) - wq := createTestQueue(t, 1000, 1, func(Request) {}) + wq := createTestQueue(t, 1000, 1, func(*Request) {}) require.Equal(t, 0, wq.Size()) require.NoError(t, ext.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/producer_consumer_queue.go b/exporter/exporterhelper/internal/producer_consumer_queue.go index 7b17106a564..f98b733172b 100644 --- a/exporter/exporterhelper/internal/producer_consumer_queue.go +++ b/exporter/exporterhelper/internal/producer_consumer_queue.go @@ -15,7 +15,7 @@ import ( type QueueSettings struct { exporter.CreateSettings DataType component.DataType - Callback func(item Request) + Callback func(item *Request) } // ProducerConsumerQueue defines a producer-consumer exchange which can be backed by e.g. the memory-based ring buffer queue @@ -26,7 +26,7 @@ type ProducerConsumerQueue interface { Start(ctx context.Context, host component.Host, set QueueSettings) error // Produce is used by the producer to submit new item to the queue. Returns false if the item wasn't added // to the queue due to queue overflow. - Produce(item Request) bool + Produce(item *Request) bool // Size returns the current Size of the queue Size() int // Stop stops all consumers, as well as the length reporter if started, diff --git a/exporter/exporterhelper/internal/request.go b/exporter/exporterhelper/internal/request.go index 454a42782ce..71e68cb467f 100644 --- a/exporter/exporterhelper/internal/request.go +++ b/exporter/exporterhelper/internal/request.go @@ -3,34 +3,58 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" -import "context" +import ( + "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/request" +) // Request defines capabilities required for persistent storage of a request -type Request interface { - // Context returns the context.Context of the requests. - Context() context.Context +type Request struct { + request.Request + ctx context.Context + processingFinishedCallback func() +} - // SetContext updates the context.Context of the requests. - SetContext(context.Context) +func NewRequest(ctx context.Context, req request.Request) *Request { + return &Request{Request: req, ctx: ctx} +} - Export(ctx context.Context) error +func (req *Request) OnError(err error) *Request { + if r, ok := req.Request.(request.ErrorHandler); ok { + return &Request{ + Request: r.OnError(err), + ctx: req.ctx, + } + } + return req +} - // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. - // Otherwise, it should return the original Request. - OnError(error) Request +// Count returns a number of items in the request. If the request does not implement RequestItemsCounter +// then 0 is returned. +func (req *Request) Count() int { + if counter, ok := req.Request.(request.ItemsCounter); ok { + return counter.ItemsCount() + } + return 0 +} - // Count returns the count of spans/metric points or log records. - Count() int +func (req *Request) Context() context.Context { + return req.ctx +} - // OnProcessingFinished calls the optional callback function to handle cleanup after all processing is finished - OnProcessingFinished() +func (req *Request) SetContext(ctx context.Context) { + req.ctx = ctx +} - // SetOnProcessingFinished allows to set an optional callback function to do the cleanup (e.g. remove the item from persistent queue) - SetOnProcessingFinished(callback func()) +func (req *Request) OnProcessingFinished() { + if req.processingFinishedCallback != nil { + req.processingFinishedCallback() + } } // RequestUnmarshaler defines a function which takes a byte slice and unmarshals it into a relevant request -type RequestUnmarshaler func([]byte) (Request, error) +type RequestUnmarshaler func([]byte) (*Request, error) // RequestMarshaler defines a function which takes a request and marshals it into a byte slice -type RequestMarshaler func(Request) ([]byte, error) +type RequestMarshaler func(*Request) ([]byte, error) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index b098e722921..e8940713e28 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/plog" ) @@ -21,21 +22,19 @@ var logsMarshaler = &plog.ProtoMarshaler{} var logsUnmarshaler = &plog.ProtoUnmarshaler{} type logsRequest struct { - baseRequest ld plog.Logs pusher consumer.ConsumeLogsFunc } -func newLogsRequest(ctx context.Context, ld plog.Logs, pusher consumer.ConsumeLogsFunc) internal.Request { - return &logsRequest{ - baseRequest: baseRequest{ctx: ctx}, - ld: ld, - pusher: pusher, - } +func newLogsRequest(ctx context.Context, ld plog.Logs, pusher consumer.ConsumeLogsFunc) *internal.Request { + return internal.NewRequest(ctx, &logsRequest{ + ld: ld, + pusher: pusher, + }) } func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { + return func(bytes []byte) (*internal.Request, error) { logs, err := logsUnmarshaler.UnmarshalLogs(bytes) if err != nil { return nil, err @@ -44,14 +43,17 @@ func newLogsRequestUnmarshalerFunc(pusher consumer.ConsumeLogsFunc) internal.Req } } -func logsRequestMarshaler(req internal.Request) ([]byte, error) { - return logsMarshaler.MarshalLogs(req.(*logsRequest).ld) +func logsRequestMarshaler(req *internal.Request) ([]byte, error) { + return logsMarshaler.MarshalLogs(req.Request.(*logsRequest).ld) } -func (req *logsRequest) OnError(err error) internal.Request { +func (req *logsRequest) OnError(err error) request.Request { var logError consumererror.Logs if errors.As(err, &logError) { - return newLogsRequest(req.ctx, logError.Data(), req.pusher) + return &logsRequest{ + ld: logError.Data(), + pusher: req.pusher, + } } return req } @@ -60,7 +62,7 @@ func (req *logsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.ld) } -func (req *logsRequest) Count() int { +func (req *logsRequest) ItemsCount() int { return req.ld.LogRecordCount() } @@ -115,7 +117,7 @@ func NewLogsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type LogsConverter interface { // RequestFromLogs converts plog.Logs data into a request. - RequestFromLogs(context.Context, plog.Logs) (Request, error) + RequestFromLogs(context.Context, plog.Logs) (request.Request, error) } // NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. @@ -148,7 +150,7 @@ func NewLogsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := internal.NewRequest(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordLogsEnqueueFailure(r.Context(), int64(r.Count())) @@ -171,7 +173,7 @@ func newLogsExporterWithObservability(obsrep *obsExporter) requestSender { return &logsExporterWithObservability{obsrep: obsrep} } -func (lewo *logsExporterWithObservability) send(req internal.Request) error { +func (lewo *logsExporterWithObservability) send(req *internal.Request) error { req.SetContext(lewo.obsrep.StartLogsOp(req.Context())) err := lewo.nextSender.send(req) lewo.obsrep.EndLogsOp(req.Context(), req.Count(), err) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index e3f09f361c7..099a74d46f7 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -21,21 +22,19 @@ var metricsMarshaler = &pmetric.ProtoMarshaler{} var metricsUnmarshaler = &pmetric.ProtoUnmarshaler{} type metricsRequest struct { - baseRequest md pmetric.Metrics pusher consumer.ConsumeMetricsFunc } -func newMetricsRequest(ctx context.Context, md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) internal.Request { - return &metricsRequest{ - baseRequest: baseRequest{ctx: ctx}, - md: md, - pusher: pusher, - } +func newMetricsRequest(ctx context.Context, md pmetric.Metrics, pusher consumer.ConsumeMetricsFunc) *internal.Request { + return internal.NewRequest(ctx, &metricsRequest{ + md: md, + pusher: pusher, + }) } func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { + return func(bytes []byte) (*internal.Request, error) { metrics, err := metricsUnmarshaler.UnmarshalMetrics(bytes) if err != nil { return nil, err @@ -44,14 +43,17 @@ func newMetricsRequestUnmarshalerFunc(pusher consumer.ConsumeMetricsFunc) intern } } -func metricsRequestMarshaler(req internal.Request) ([]byte, error) { - return metricsMarshaler.MarshalMetrics(req.(*metricsRequest).md) +func metricsRequestMarshaler(req *internal.Request) ([]byte, error) { + return metricsMarshaler.MarshalMetrics(req.Request.(*metricsRequest).md) } -func (req *metricsRequest) OnError(err error) internal.Request { +func (req *metricsRequest) OnError(err error) request.Request { var metricsError consumererror.Metrics if errors.As(err, &metricsError) { - return newMetricsRequest(req.ctx, metricsError.Data(), req.pusher) + return &metricsRequest{ + md: metricsError.Data(), + pusher: req.pusher, + } } return req } @@ -60,7 +62,7 @@ func (req *metricsRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.md) } -func (req *metricsRequest) Count() int { +func (req *metricsRequest) ItemsCount() int { return req.md.DataPointCount() } @@ -115,7 +117,7 @@ func NewMetricsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type MetricsConverter interface { // RequestFromMetrics converts pdata.Metrics into a request. - RequestFromMetrics(context.Context, pmetric.Metrics) (Request, error) + RequestFromMetrics(context.Context, pmetric.Metrics) (request.Request, error) } // NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender. @@ -148,7 +150,7 @@ func NewMetricsRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := internal.NewRequest(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordMetricsEnqueueFailure(r.Context(), int64(r.Count())) @@ -171,7 +173,7 @@ func newMetricsSenderWithObservability(obsrep *obsExporter) requestSender { return &metricsSenderWithObservability{obsrep: obsrep} } -func (mewo *metricsSenderWithObservability) send(req internal.Request) error { +func (mewo *metricsSenderWithObservability) send(req *internal.Request) error { req.SetContext(mewo.obsrep.StartMetricsOp(req.Context())) err := mewo.nextSender.send(req) mewo.obsrep.EndMetricsOp(req.Context(), req.Count(), err) diff --git a/exporter/exporterhelper/queue_sender.go b/exporter/exporterhelper/queue_sender.go index d3da29500fe..83f202ee163 100644 --- a/exporter/exporterhelper/queue_sender.go +++ b/exporter/exporterhelper/queue_sender.go @@ -86,7 +86,7 @@ func newQueueSender(id component.ID, signal component.DataType, queue internal.P } } -func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req internal.Request, err error) error { +func (qs *queueSender) onTemporaryFailure(logger *zap.Logger, req *internal.Request, err error) error { if !qs.requeuingEnabled || qs.queue == nil { logger.Error( "Exporting failed. No more retries left. Dropping data.", @@ -120,7 +120,7 @@ func (qs *queueSender) start(ctx context.Context, host component.Host, set expor err := qs.queue.Start(ctx, host, internal.QueueSettings{ CreateSettings: set, DataType: qs.signal, - Callback: func(item internal.Request) { + Callback: func(item *internal.Request) { _ = qs.nextSender.send(item) item.OnProcessingFinished() }, @@ -161,7 +161,7 @@ func (qs *queueSender) shutdown() { } // send implements the requestSender interface -func (qs *queueSender) send(req internal.Request) error { +func (qs *queueSender) send(req *internal.Request) error { if qs.queue == nil { err := qs.nextSender.send(req) if err != nil { diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index d5c7f00de6b..e1bbfde6a47 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -48,7 +48,7 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { assert.NoError(t, be.Shutdown(context.Background())) - secondMockR.checkNumRequests(t, 1) + secondMockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 3) ocs.checkDroppedItemsCount(t, 7) require.Zero(t, be.queueSender.(*queueSender).queue.Size()) @@ -75,7 +75,7 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { }) ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) require.Zero(t, be.queueSender.(*queueSender).queue.Size()) @@ -110,7 +110,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { }) wantRequests := 10 - reqs := make([]*mockRequest, 0, 10) + reqs := make([]*internal.Request, 0, 10) for i := 0; i < wantRequests; i++ { ocs.run(func() { req := newMockRequest(context.Background(), 2, nil) @@ -124,7 +124,7 @@ func TestQueuedRetryHappyPath(t *testing.T) { require.Len(t, reqs, wantRequests) for _, req := range reqs { - req.checkNumRequests(t, 1) + req.Request.(*mockRequest).checkNumRequests(t, 1) } ocs.checkSendItemsCount(t, 2*wantRequests) @@ -202,7 +202,7 @@ func TestQueuedRetry_RequeuingEnabled(t *testing.T) { ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) + mockR.Request.(*mockRequest).checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 1) ocs.checkDroppedItemsCount(t, 1) // not actually dropped, but ocs counts each failed send here } @@ -226,7 +226,7 @@ func TestQueuedRetry_RequeuingEnabledQueueFull(t *testing.T) { mockR := newMockRequest(context.Background(), 1, traceErr) require.Error(t, be.retrySender.send(mockR), "sending_queue is full") - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) } func TestQueueRetryWithDisabledQueue(t *testing.T) { @@ -243,7 +243,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { require.Error(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0) ocs.checkDroppedItemsCount(t, 2) require.NoError(t, be.Shutdown(context.Background())) @@ -283,7 +283,8 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := NewDefaultRetrySettings() set := tt.ToExporterCreateSettings() - be, err := newBaseExporter(set, "", false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + unmarshaler := mockRequestUnmarshaler(newMockRequest(context.Background(), 1, nil)) + be, err := newBaseExporter(set, "", false, mockRequestMarshaler, unmarshaler, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go deleted file mode 100644 index ef05aa6395d..00000000000 --- a/exporter/exporterhelper/request.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright The OpenTelemetry Authors -// SPDX-License-Identifier: Apache-2.0 - -package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" - -import ( - "context" - - "go.opentelemetry.io/collector/exporter/exporterhelper/internal" -) - -// Request represents a single request that can be sent to an external endpoint. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type Request interface { - // Export exports the request to an external endpoint. - Export(ctx context.Context) error -} - -// RequestItemsCounter is an optional interface that can be implemented by Request to provide a number of items -// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing -// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. -// If not implemented, collector's logs, metrics and traces will report 0 items. -// This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type RequestItemsCounter interface { - // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be - // sent. For example, for OTLP exporter, this value represents the number of spans, - // metric data points or log records. - ItemsCount() int -} - -type request struct { - Request - baseRequest -} - -var _ internal.Request = (*request)(nil) - -func newRequest(ctx context.Context, req Request) *request { - return &request{ - Request: req, - baseRequest: baseRequest{ctx: ctx}, - } -} - -func (req *request) OnError(_ error) internal.Request { - // Potentially we could introduce a new RequestError type that would represent partially succeeded request. - // In that case we should consider returning them back to the pipeline converted back to pdata in case if - // sending queue is disabled. We leave it as a future improvement if decided that it's needed. - return req -} - -// Count returns a number of items in the request. If the request does not implement RequestItemsCounter -// then 0 is returned. -func (req *request) Count() int { - if counter, ok := req.Request.(RequestItemsCounter); ok { - return counter.ItemsCount() - } - return 0 -} diff --git a/exporter/exporterhelper/request/request.go b/exporter/exporterhelper/request/request.go new file mode 100644 index 00000000000..32499c9fbcc --- /dev/null +++ b/exporter/exporterhelper/request/request.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package request // import "go.opentelemetry.io/collector/exporter/exporterhelper/request" + +import ( + "context" +) + +// Request represents a single request that can be sent to an external endpoint. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Request interface { + // Export exports the request to an external endpoint. + Export(ctx context.Context) error +} + +// ItemsCounter is an optional interface that can be implemented by Request to provide a number of items +// in the request. This is a recommended interface to implement for exporters. It is required for batching and queueing +// based on number of items. Also, it's used for reporting number of items in collector's logs, metrics and traces. +// If not implemented, collector's logs, metrics and traces will report 0 items. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type ItemsCounter interface { + // ItemsCount returns a number of basic items in the request where item is the smallest piece of data that can be + // sent. For example, for OTLP exporter, this value represents the number of spans, + // metric data points or log records. + ItemsCount() int +} + +// ErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial +// temporary failures. For example, if some items failed to process and can be retried, this interface allows to +// return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. +// If not implemented, the original Request will be returned assuming the error is applied to the whole Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type ErrorHandler interface { + // OnError returns a new Request may contain the items left to be sent if some items failed to process and can be retried. + // Otherwise, it should return the original Request. + OnError(error) Request +} + +// Marshaler is a function that can marshal a Request into bytes. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Marshaler func(req Request) ([]byte, error) + +// Unmarshaler is a function that can unmarshal bytes into a Request. +// This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Unmarshaler func(data []byte) (Request, error) diff --git a/exporter/exporterhelper/request_test.go b/exporter/exporterhelper/request_test.go index 6dd3f67800a..2f03fc4d621 100644 --- a/exporter/exporterhelper/request_test.go +++ b/exporter/exporterhelper/request_test.go @@ -6,6 +6,7 @@ package exporterhelper import ( "context" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -31,14 +32,14 @@ type fakeRequestConverter struct { requestError error } -func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (Request, error) { +func (c fakeRequestConverter) RequestFromMetrics(_ context.Context, md pmetric.Metrics) (request.Request, error) { return fakeRequest{items: md.DataPointCount(), err: c.requestError}, c.metricsError } -func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (Request, error) { +func (c fakeRequestConverter) RequestFromTraces(_ context.Context, td ptrace.Traces) (request.Request, error) { return fakeRequest{items: td.SpanCount(), err: c.requestError}, c.tracesError } -func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (Request, error) { +func (c fakeRequestConverter) RequestFromLogs(_ context.Context, ld plog.Logs) (request.Request, error) { return fakeRequest{items: ld.LogRecordCount(), err: c.requestError}, c.logsError } diff --git a/exporter/exporterhelper/retry_sender.go b/exporter/exporterhelper/retry_sender.go index 14a90a9c1e6..f896ad20ce1 100644 --- a/exporter/exporterhelper/retry_sender.go +++ b/exporter/exporterhelper/retry_sender.go @@ -73,7 +73,7 @@ func NewThrottleRetry(err error, delay time.Duration) error { } } -type onRequestHandlingFinishedFunc func(*zap.Logger, internal.Request, error) error +type onRequestHandlingFinishedFunc func(*zap.Logger, *internal.Request, error) error type retrySender struct { baseRequestSender @@ -86,7 +86,7 @@ type retrySender struct { func newRetrySender(id component.ID, rCfg RetrySettings, logger *zap.Logger, onTemporaryFailure onRequestHandlingFinishedFunc) *retrySender { if onTemporaryFailure == nil { - onTemporaryFailure = func(logger *zap.Logger, req internal.Request, err error) error { + onTemporaryFailure = func(logger *zap.Logger, req *internal.Request, err error) error { return err } } @@ -104,7 +104,7 @@ func (rs *retrySender) shutdown() { } // send implements the requestSender interface -func (rs *retrySender) send(req internal.Request) error { +func (rs *retrySender) send(req *internal.Request) error { if !rs.cfg.Enabled { err := rs.nextSender.send(req) if err != nil { diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 1da5fb29ebd..ab4fcbb4ccd 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "sync" "sync/atomic" "testing" @@ -26,13 +27,13 @@ import ( "go.opentelemetry.io/collector/internal/testdata" ) -func mockRequestUnmarshaler(mr *mockRequest) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { +func mockRequestUnmarshaler(mr *internal.Request) internal.RequestUnmarshaler { + return func(bytes []byte) (*internal.Request, error) { return mr, nil } } -func mockRequestMarshaler(_ internal.Request) ([]byte, error) { +func mockRequestMarshaler(_ *internal.Request) ([]byte, error) { return nil, nil } @@ -54,7 +55,7 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0) ocs.checkDroppedItemsCount(t, 2) } @@ -80,7 +81,7 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { }) ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0) ocs.checkDroppedItemsCount(t, 2) } @@ -107,7 +108,7 @@ func TestQueuedRetry_OnError(t *testing.T) { ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) + mockR.Request.(*mockRequest).checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) } @@ -145,7 +146,7 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { assert.Less(t, waitingTime, 150*time.Millisecond) // In the newMockConcurrentExporter we count requests and items even for failed requests. - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 7) require.Zero(t, be.queueSender.(*queueSender).queue.Size()) @@ -184,7 +185,7 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { // The initial backoff is 10ms, but because of the throttle this should wait at least 100ms. assert.True(t, 100*time.Millisecond < time.Since(start)) - mockR.checkNumRequests(t, 2) + mockR.Request.(*mockRequest).checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) require.Zero(t, be.queueSender.(*queueSender).queue.Size()) @@ -212,7 +213,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { ocs.awaitAsyncProcessing() // In the newMockConcurrentExporter we count requests and items even for failed requests - mockR.checkNumRequests(t, 2) + mockR.Request.(*mockRequest).checkNumRequests(t, 2) ocs.checkSendItemsCount(t, 2) ocs.checkDroppedItemsCount(t, 0) require.Zero(t, be.queueSender.(*queueSender).queue.Size()) @@ -231,36 +232,27 @@ func TestQueueRetryWithNoQueue(t *testing.T) { require.Error(t, be.send(mockR)) }) ocs.awaitAsyncProcessing() - mockR.checkNumRequests(t, 1) + mockR.Request.(*mockRequest).checkNumRequests(t, 1) ocs.checkSendItemsCount(t, 0) ocs.checkDroppedItemsCount(t, 2) require.NoError(t, be.Shutdown(context.Background())) } -type mockErrorRequest struct { - baseRequest -} +type mockErrorRequest struct{} func (mer *mockErrorRequest) Export(_ context.Context) error { return errors.New("transient error") } -func (mer *mockErrorRequest) OnError(error) internal.Request { - return mer -} - -func (mer *mockErrorRequest) Count() int { +func (mer *mockErrorRequest) ItemsCount() int { return 7 } -func newErrorRequest(ctx context.Context) internal.Request { - return &mockErrorRequest{ - baseRequest: baseRequest{ctx: ctx}, - } +func newErrorRequest(ctx context.Context) *internal.Request { + return internal.NewRequest(ctx, &mockErrorRequest{}) } type mockRequest struct { - baseRequest cnt int mu sync.Mutex consumeError error @@ -280,9 +272,8 @@ func (m *mockRequest) Export(ctx context.Context) error { return ctx.Err() } -func (m *mockRequest) OnError(error) internal.Request { +func (m *mockRequest) OnError(error) request.Request { return &mockRequest{ - baseRequest: m.baseRequest, cnt: 1, consumeError: nil, requestCount: m.requestCount, @@ -295,17 +286,16 @@ func (m *mockRequest) checkNumRequests(t *testing.T, want int) { }, time.Second, 1*time.Millisecond) } -func (m *mockRequest) Count() int { +func (m *mockRequest) ItemsCount() int { return m.cnt } -func newMockRequest(ctx context.Context, cnt int, consumeError error) *mockRequest { - return &mockRequest{ - baseRequest: baseRequest{ctx: ctx}, +func newMockRequest(ctx context.Context, cnt int, consumeError error) *internal.Request { + return internal.NewRequest(ctx, &mockRequest{ cnt: cnt, consumeError: consumeError, requestCount: &atomic.Int64{}, - } + }) } type observabilityConsumerSender struct { @@ -323,7 +313,7 @@ func newObservabilityConsumerSender(_ *obsExporter) requestSender { } } -func (ocs *observabilityConsumerSender) send(req internal.Request) error { +func (ocs *observabilityConsumerSender) send(req *internal.Request) error { err := ocs.nextSender.send(req) if err != nil { ocs.droppedItemsCount.Add(int64(req.Count())) @@ -400,7 +390,7 @@ type producerConsumerQueueWithCounter struct { produceCounter *atomic.Uint32 } -func (pcq *producerConsumerQueueWithCounter) Produce(item internal.Request) bool { +func (pcq *producerConsumerQueueWithCounter) Produce(item *internal.Request) bool { pcq.produceCounter.Add(1) return pcq.ProducerConsumerQueue.Produce(item) } @@ -410,6 +400,6 @@ type errorRequestSender struct { errToReturn error } -func (rs *errorRequestSender) send(_ internal.Request) error { +func (rs *errorRequestSender) send(_ *internal.Request) error { return rs.errToReturn } diff --git a/exporter/exporterhelper/timeout_sender.go b/exporter/exporterhelper/timeout_sender.go index 11b85cf08be..34e3e77a930 100644 --- a/exporter/exporterhelper/timeout_sender.go +++ b/exporter/exporterhelper/timeout_sender.go @@ -29,7 +29,7 @@ type timeoutSender struct { cfg TimeoutSettings } -func (ts *timeoutSender) send(req internal.Request) error { +func (ts *timeoutSender) send(req *internal.Request) error { // Intentionally don't overwrite the context inside the request, because in case of retries deadline will not be // updated because this deadline most likely is before the next one. ctx := req.Context() diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 4b9e397ec43..d43eb02d7e0 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/request" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -21,21 +22,19 @@ var tracesMarshaler = &ptrace.ProtoMarshaler{} var tracesUnmarshaler = &ptrace.ProtoUnmarshaler{} type tracesRequest struct { - baseRequest td ptrace.Traces pusher consumer.ConsumeTracesFunc } -func newTracesRequest(ctx context.Context, td ptrace.Traces, pusher consumer.ConsumeTracesFunc) internal.Request { - return &tracesRequest{ - baseRequest: baseRequest{ctx: ctx}, - td: td, - pusher: pusher, - } +func newTracesRequest(ctx context.Context, td ptrace.Traces, pusher consumer.ConsumeTracesFunc) *internal.Request { + return internal.NewRequest(ctx, &tracesRequest{ + td: td, + pusher: pusher, + }) } func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal.RequestUnmarshaler { - return func(bytes []byte) (internal.Request, error) { + return func(bytes []byte) (*internal.Request, error) { traces, err := tracesUnmarshaler.UnmarshalTraces(bytes) if err != nil { return nil, err @@ -44,14 +43,17 @@ func newTraceRequestUnmarshalerFunc(pusher consumer.ConsumeTracesFunc) internal. } } -func tracesRequestMarshaler(req internal.Request) ([]byte, error) { - return tracesMarshaler.MarshalTraces(req.(*tracesRequest).td) +func tracesRequestMarshaler(req *internal.Request) ([]byte, error) { + return tracesMarshaler.MarshalTraces(req.Request.(*tracesRequest).td) } -func (req *tracesRequest) OnError(err error) internal.Request { +func (req *tracesRequest) OnError(err error) request.Request { var traceError consumererror.Traces if errors.As(err, &traceError) { - return newTracesRequest(req.ctx, traceError.Data(), req.pusher) + return &tracesRequest{ + td: traceError.Data(), + pusher: req.pusher, + } } return req } @@ -60,7 +62,7 @@ func (req *tracesRequest) Export(ctx context.Context) error { return req.pusher(ctx, req.td) } -func (req *tracesRequest) Count() int { +func (req *tracesRequest) ItemsCount() int { return req.td.SpanCount() } @@ -115,7 +117,7 @@ func NewTracesExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type TracesConverter interface { // RequestFromTraces converts ptrace.Traces into a Request. - RequestFromTraces(context.Context, ptrace.Traces) (Request, error) + RequestFromTraces(context.Context, ptrace.Traces) (request.Request, error) } // NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. @@ -148,7 +150,7 @@ func NewTracesRequestExporter( zap.Error(err)) return consumererror.NewPermanent(cErr) } - r := newRequest(ctx, req) + r := internal.NewRequest(ctx, req) sErr := be.send(r) if errors.Is(sErr, errSendingQueueIsFull) { be.obsrep.recordTracesEnqueueFailure(r.Context(), int64(r.Count())) @@ -171,7 +173,7 @@ func newTracesExporterWithObservability(obsrep *obsExporter) requestSender { return &tracesExporterWithObservability{obsrep: obsrep} } -func (tewo *tracesExporterWithObservability) send(req internal.Request) error { +func (tewo *tracesExporterWithObservability) send(req *internal.Request) error { req.SetContext(tewo.obsrep.StartTracesOp(req.Context())) // Forward the data to the next consumer (this pusher is the next). err := tewo.nextSender.send(req)