Skip to content

Commit

Permalink
[exporterhelper] Convert internal request interface into a struct
Browse files Browse the repository at this point in the history
At this point we have two interfaces: public and internal. Each interface have their own implementation which makes it hard to read.

This change replaces the internal interface with a struct embedding the public interface. This requires moving the public interface to a separate package which makes the module structure cleaner anyway
  • Loading branch information
dmitryax committed Sep 13, 2023
1 parent 767b95e commit 26e138f
Show file tree
Hide file tree
Showing 23 changed files with 284 additions and 265 deletions.
18 changes: 18 additions & 0 deletions .chloggen/exporter-helper-add-request-onerror.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhgelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new optional interface `request.ErrorHandler` to the helper.

# One or more tracking issues or pull requests related to the change
issues: [8435]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
18 changes: 18 additions & 0 deletions .chloggen/exporter-helper-move-request.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: breaking

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhgelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Move request related API to a separate package

# One or more tracking issues or pull requests related to the change
issues: [8435]

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
30 changes: 3 additions & 27 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
type requestSender interface {
start(ctx context.Context, host component.Host, set exporter.CreateSettings) error
shutdown()
send(req internal.Request) error
send(req *internal.Request) error
setNextSender(nextSender requestSender)
}

Expand All @@ -37,7 +37,7 @@ func (b *baseRequestSender) start(context.Context, component.Host, exporter.Crea

func (b *baseRequestSender) shutdown() {}

func (b *baseRequestSender) send(req internal.Request) error {
func (b *baseRequestSender) send(req *internal.Request) error {
return b.nextSender.send(req)
}

Expand All @@ -47,30 +47,6 @@ func (b *baseRequestSender) setNextSender(nextSender requestSender) {

type obsrepSenderFactory func(obsrep *obsExporter) requestSender

// baseRequest is a base implementation for the internal.Request.
type baseRequest struct {
ctx context.Context
processingFinishedCallback func()
}

func (req *baseRequest) Context() context.Context {
return req.ctx
}

func (req *baseRequest) SetContext(ctx context.Context) {
req.ctx = ctx
}

func (req *baseRequest) SetOnProcessingFinished(callback func()) {
req.processingFinishedCallback = callback
}

func (req *baseRequest) OnProcessingFinished() {
if req.processingFinishedCallback != nil {
req.processingFinishedCallback()
}
}

// Option apply changes to baseExporter.
type Option func(*baseExporter)

Expand Down Expand Up @@ -199,7 +175,7 @@ func newBaseExporter(set exporter.CreateSettings, signal component.DataType, req
}

// send sends the request using the first sender in the chain.
func (be *baseExporter) send(req internal.Request) error {
func (be *baseExporter) send(req *internal.Request) error {
return be.queueSender.send(req)
}

Expand Down
6 changes: 3 additions & 3 deletions exporter/exporterhelper/internal/bounded_memory_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type boundedMemoryQueue struct {
stopWG sync.WaitGroup
size *atomic.Uint32
stopped *atomic.Bool
items chan Request
items chan *Request
capacity uint32
numConsumers int
}
Expand All @@ -29,7 +29,7 @@ type boundedMemoryQueue struct {
// callback for dropped items (e.g. useful to emit metrics).
func NewBoundedMemoryQueue(capacity int, numConsumers int) ProducerConsumerQueue {
return &boundedMemoryQueue{
items: make(chan Request, capacity),
items: make(chan *Request, capacity),
stopped: &atomic.Bool{},
size: &atomic.Uint32{},
capacity: uint32(capacity),
Expand Down Expand Up @@ -58,7 +58,7 @@ func (q *boundedMemoryQueue) Start(_ context.Context, _ component.Host, set Queu
}

// Produce is used by the producer to submit new item to the queue. Returns false in case of queue overflow.
func (q *boundedMemoryQueue) Produce(item Request) bool {
func (q *boundedMemoryQueue) Produce(item *Request) bool {
if q.stopped.Load() {
return false
}
Expand Down
25 changes: 14 additions & 11 deletions exporter/exporterhelper/internal/bounded_memory_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
"go.opentelemetry.io/collector/exporter/exportertest"
)

func newNopQueueSettings(callback func(item Request)) QueueSettings {
func newNopQueueSettings(callback func(item *Request)) QueueSettings {
return QueueSettings{
CreateSettings: exportertest.NewNopCreateSettings(),
DataType: component.DataTypeMetrics,
Expand All @@ -30,27 +30,30 @@ func newNopQueueSettings(callback func(item Request)) QueueSettings {
}

type stringRequest struct {
Request
str string
}

func newStringRequest(str string) Request {
return stringRequest{str: str}
func (s stringRequest) Export(_ context.Context) error {
return nil
}

func newStringRequest(str string) *Request {
return NewRequest(context.Background(), stringRequest{str: str})
}

// In this test we run a queue with capacity 1 and a single consumer.
// We want to test the overflow behavior, so we block the consumer
// by holding a startLock before submitting items to the queue.
func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item Request))) {
func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerFn func(item *Request))) {
q := NewBoundedMemoryQueue(1, 1)

var startLock sync.Mutex

startLock.Lock() // block consumers
consumerState := newConsumerState(t)

startConsumers(q, func(item Request) {
consumerState.record(item.(stringRequest).str)
startConsumers(q, func(item *Request) {
consumerState.record(item.Request.(stringRequest).str)

// block further processing until startLock is released
startLock.Lock()
Expand Down Expand Up @@ -100,7 +103,7 @@ func helper(t *testing.T, startConsumers func(q ProducerConsumerQueue, consumerF
}

func TestBoundedQueue(t *testing.T) {
helper(t, func(q ProducerConsumerQueue, consumerFn func(item Request)) {
helper(t, func(q ProducerConsumerQueue, consumerFn func(item *Request)) {
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(consumerFn)))
})
}
Expand All @@ -116,8 +119,8 @@ func TestShutdownWhileNotEmpty(t *testing.T) {

consumerState := newConsumerState(t)

assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {
consumerState.record(item.(stringRequest).str)
assert.NoError(t, q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item *Request) {
consumerState.record(item.Request.(stringRequest).str)
time.Sleep(1 * time.Second)
})))

Expand Down Expand Up @@ -198,7 +201,7 @@ func (s *consumerState) assertConsumed(expected map[string]bool) {
func TestZeroSize(t *testing.T) {
q := NewBoundedMemoryQueue(0, 1)

err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item Request) {}))
err := q.Start(context.Background(), componenttest.NewNopHost(), newNopQueueSettings(func(item *Request) {}))
assert.NoError(t, err)

assert.False(t, q.Produce(newStringRequest("a"))) // in process
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/persistent_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (pq *persistentQueue) Start(ctx context.Context, host component.Host, set Q
}

// Produce adds an item to the queue and returns true if it was accepted
func (pq *persistentQueue) Produce(item Request) bool {
func (pq *persistentQueue) Produce(item *Request) bool {
err := pq.storage.put(item)
return err == nil
}
Expand Down
10 changes: 5 additions & 5 deletions exporter/exporterhelper/internal/persistent_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (nh *mockHost) GetExtensions() map[component.ID]component.Component {
}

// createTestQueue creates and starts a fake queue with the given capacity and number of consumers.
func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item Request)) ProducerConsumerQueue {
func createTestQueue(t *testing.T, capacity, numConsumers int, callback func(item *Request)) ProducerConsumerQueue {
pq := NewPersistentQueue(capacity, numConsumers, component.ID{}, newFakeTracesRequestMarshalerFunc(),
newFakeTracesRequestUnmarshalerFunc())
host := &mockHost{ext: map[component.ID]component.Component{
Expand All @@ -51,7 +51,7 @@ func TestPersistentQueue_Capacity(t *testing.T) {
host := &mockHost{ext: map[component.ID]component.Component{
{}: NewMockStorageExtension(nil),
}}
err := pq.Start(context.Background(), host, newNopQueueSettings(func(req Request) {}))
err := pq.Start(context.Background(), host, newNopQueueSettings(func(req *Request) {}))
require.NoError(t, err)

// Stop consumer to imitate queue overflow
Expand Down Expand Up @@ -85,7 +85,7 @@ func TestPersistentQueue_Capacity(t *testing.T) {
}

func TestPersistentQueue_Close(t *testing.T) {
wq := createTestQueue(t, 1001, 100, func(item Request) {})
wq := createTestQueue(t, 1001, 100, func(item *Request) {})
traces := newTraces(1, 10)
req := newFakeTracesRequest(traces)

Expand All @@ -104,7 +104,7 @@ func TestPersistentQueue_Close(t *testing.T) {

// Verify storage closes after queue consumers. If not in this order, successfully consumed items won't be updated in storage
func TestPersistentQueue_Close_StorageCloseAfterConsumers(t *testing.T) {
wq := createTestQueue(t, 1001, 1, func(item Request) {})
wq := createTestQueue(t, 1001, 1, func(item *Request) {})
traces := newTraces(1, 10)

lastRequestProcessedTime := time.Now()
Expand Down Expand Up @@ -163,7 +163,7 @@ func TestPersistentQueue_ConsumersProducers(t *testing.T) {
req := newFakeTracesRequest(traces)

numMessagesConsumed := &atomic.Int32{}
tq := createTestQueue(t, 1000, c.numConsumers, func(item Request) {
tq := createTestQueue(t, 1000, c.numConsumers, func(item *Request) {
if item != nil {
numMessagesConsumed.Add(int32(1))
}
Expand Down
24 changes: 12 additions & 12 deletions exporter/exporterhelper/internal/persistent_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type persistentContiguousStorage struct {
stopOnce sync.Once
capacity uint64

reqChan chan Request
reqChan chan *Request

mu sync.Mutex
readIndex itemIndex
Expand Down Expand Up @@ -91,7 +91,7 @@ func newPersistentContiguousStorage(ctx context.Context, queueName string, clien
marshaler: marshaler,
capacity: capacity,
putChan: make(chan struct{}, capacity),
reqChan: make(chan Request),
reqChan: make(chan *Request),
stopChan: make(chan struct{}),
itemsCount: &atomic.Uint64{},
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func initPersistentContiguousStorage(ctx context.Context, pcs *persistentContigu
pcs.itemsCount.Store(uint64(pcs.writeIndex - pcs.readIndex))
}

func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []Request) {
func (pcs *persistentContiguousStorage) enqueueNotDispatchedReqs(reqs []*Request) {
if len(reqs) > 0 {
errCount := 0
for _, req := range reqs {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (pcs *persistentContiguousStorage) loop() {
}

// get returns the request channel that all the requests will be send on
func (pcs *persistentContiguousStorage) get() <-chan Request {
func (pcs *persistentContiguousStorage) get() <-chan *Request {
return pcs.reqChan
}

Expand All @@ -203,7 +203,7 @@ func (pcs *persistentContiguousStorage) stop() {
}

// put marshals the request and puts it into the persistent queue
func (pcs *persistentContiguousStorage) put(req Request) error {
func (pcs *persistentContiguousStorage) put(req *Request) error {
// Nil requests are ignored
if req == nil {
return nil
Expand Down Expand Up @@ -231,7 +231,7 @@ func (pcs *persistentContiguousStorage) put(req Request) error {
}

// getNextItem pulls the next available item from the persistent storage; if none is found, returns (nil, false)
func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Request, bool) {
func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (*Request, bool) {
pcs.mu.Lock()
defer pcs.mu.Unlock()

Expand All @@ -244,7 +244,7 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques
pcs.updateReadIndex(ctx)
pcs.itemDispatchingStart(ctx, index)

var req Request
var req *Request
batch, err := newBatch(pcs).get(pcs.itemKey(index)).execute(ctx)
if err == nil {
req, err = batch.getRequestResult(pcs.itemKey(index))
Expand All @@ -261,14 +261,14 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques
}

// If all went well so far, cleanup will be handled by callback
req.SetOnProcessingFinished(func() {
req.processingFinishedCallback = func() {
pcs.mu.Lock()
defer pcs.mu.Unlock()
if err := pcs.itemDispatchingFinish(ctx, index); err != nil {
pcs.logger.Error("Error deleting item from queue",
zap.String(zapQueueNameKey, pcs.queueName), zap.Error(err))
}
})
}
return req, true
}

Expand All @@ -278,8 +278,8 @@ func (pcs *persistentContiguousStorage) getNextItem(ctx context.Context) (Reques
// retrieveNotDispatchedReqs gets the items for which sending was not finished, cleans the storage
// and moves the items back to the queue. The function returns an array which might contain nils
// if unmarshalling of the value at a given index was not possible.
func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []Request {
var reqs []Request
func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Context) []*Request {
var reqs []*Request
var dispatchedItems []itemIndex

pcs.mu.Lock()
Expand All @@ -302,7 +302,7 @@ func (pcs *persistentContiguousStorage) retrieveNotDispatchedReqs(ctx context.Co
pcs.logger.Debug("No items left for dispatch by consumers")
}

reqs = make([]Request, len(dispatchedItems))
reqs = make([]*Request, len(dispatchedItems))
keys := make([]string, len(dispatchedItems))
retrieveBatch := newBatch(pcs)
cleanupBatch := newBatch(pcs)
Expand Down
8 changes: 4 additions & 4 deletions exporter/exporterhelper/internal/persistent_storage_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (bof *batchStruct) getResult(key string, unmarshal func([]byte) (any, error

// getRequestResult returns the result of a Get operation as a request
// If the value cannot be retrieved, it returns an error
func (bof *batchStruct) getRequestResult(key string) (Request, error) {
func (bof *batchStruct) getRequestResult(key string) (*Request, error) {
reqIf, err := bof.getResult(key, bof.bytesToRequest)
if err != nil {
return nil, err
Expand All @@ -102,7 +102,7 @@ func (bof *batchStruct) getRequestResult(key string) (Request, error) {
return nil, errValueNotSet
}

return reqIf.(Request), nil
return reqIf.(*Request), nil
}

// getItemIndexResult returns the result of a Get operation as an itemIndex
Expand Down Expand Up @@ -136,7 +136,7 @@ func (bof *batchStruct) getItemIndexArrayResult(key string) ([]itemIndex, error)
}

// setRequest adds Set operation over a given request to the batch
func (bof *batchStruct) setRequest(key string, value Request) *batchStruct {
func (bof *batchStruct) setRequest(key string, value *Request) *batchStruct {
return bof.set(key, value, bof.requestToBytes)
}

Expand Down Expand Up @@ -207,7 +207,7 @@ func bytesToItemIndexArray(b []byte) (any, error) {
}

func (bof *batchStruct) requestToBytes(req any) ([]byte, error) {
return bof.pcs.marshaler(req.(Request))
return bof.pcs.marshaler(req.(*Request))
}

func (bof *batchStruct) bytesToRequest(b []byte) (any, error) {
Expand Down
Loading

0 comments on commit 26e138f

Please sign in to comment.