-
Notifications
You must be signed in to change notification settings - Fork 63
Add processor queue #197
Add processor queue #197
Conversation
This is the basis to do the work for actually implementing OC collector in the short run. At first enable receivers: 1. OC Receiver 2. Jaeger 3. Zipkin The queue processor is added but not connected to exporters by default, although there are options to do so. As previously indicated before there is some duplication of code with the agent, this should be reduced as we implement the interfaces and project coalesce around certain patterns. The queue is missing its metrics and those will be added soon.
@pjanotti sorry for the late review responses. From our discussions today after the meeting and also with @bogdandrutu, I've done some internal reflection and will be updating some interfaces and structure later this weekend and early next week. For now though, I'll do a review of this code but as we discussed will need some changes in follow-up PRs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates @pjanotti, I've added some suggestions for updates and then after that LGTM!
|
||
spans := []*tracepb.Span{{}} | ||
wantBatches := 10 | ||
wg.Add(wantBatches) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this test ever terminate? I see wg.Add(n)
but never any wg.Done()
n times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄 yes, it terminates (Travis agree). The wait group is passed to the mock and the mock is in charge of calling wg.Done()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LOL gotcha, that confused me a bit
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Later perphaps it would be nice to make mockConcurrentSpanProcessor handle all the synchronization logic otherwise right now we are leaking the abstraction with wg.Add and wg.Wait yet no wg.Done.
Methods perhaps:
(mockSpanProcessor) runConcurrently(fn func())
which will handle doingwg.Add
and thenwg.Done
(mockSpanProcessor) awaitAsyncProcessing()
which then invokewg.Wait
will ensure that we don't have to expose such internals nor ever have to worry, thinking about termination
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good suggestion, done.
} | ||
wantSpans += len(spans) | ||
spans = append(spans, &tracepb.Span{}) | ||
go goFn(batch) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We never know when this goroutine terminates. Seems like you actually meant to use the sync.WaitGroup with this goroutine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gotcha gotcha, thanks!
} | ||
|
||
// Wait until all batches received | ||
wg.Wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any usage of wg.Done
n times, looks like something is missing and I'd be surprised if we ever get past this point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @pjanotti, LGTM with just one last suggestion to avoid a leaky abstraction with the mockSpanProcessor in test.
* Start adding pipeline to collector This is the basis to do the work for actually implementing OC collector in the short run. At first enable receivers: 1. OC Receiver 2. Jaeger 3. Zipkin The queue processor is added but not connected to exporters by default, although there are options to do so. As previously indicated before there is some duplication of code with the agent, this should be reduced as we implement the interfaces and project coalesce around certain patterns. The queue is missing its metrics and those will be added soon.
Adding pipeline to collector
This is the basis to do the work for actually implementing OC collector. At first enable receivers:
and connects them with the exporters
The queue processor is added but not connected to exporters by default, although there are
options to do so. They are not connected because the exporters do their own buffering and
the queue is intended to work with senders that do no buffering.
As previously indicated before there is some duplication of code with the agent, this should
be reduced as we implement the interfaces and project coalesce around certain patterns.
The queue is still missing its metrics and more tests.
Close #169
Close #174