Skip to content

Commit

Permalink
chore(metric): Add metric interface (#40)
Browse files Browse the repository at this point in the history
  • Loading branch information
appleboy authored Dec 24, 2021
1 parent fbd8614 commit 51bba3f
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
21 changes: 11 additions & 10 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,25 @@ var errMaxCapacity = errors.New("max capacity reached")

// Worker for simple queue using channel
type Consumer struct {
taskQueue chan QueuedMessage
runFunc func(context.Context, QueuedMessage) error
stop chan struct{}
logger Logger
stopOnce sync.Once
stopFlag int32
busyWorkers uint64
taskQueue chan QueuedMessage
runFunc func(context.Context, QueuedMessage) error
stop chan struct{}
logger Logger
stopOnce sync.Once
stopFlag int32
metric Metric
}

func (s *Consumer) incBusyWorker() {
atomic.AddUint64(&s.busyWorkers, 1)
s.metric.IncBusyWorker()
}

func (s *Consumer) decBusyWorker() {
atomic.AddUint64(&s.busyWorkers, ^uint64(0))
s.metric.DecBusyWorker()
}

func (s *Consumer) BusyWorkers() uint64 {
return atomic.LoadUint64(&s.busyWorkers)
return s.metric.BusyWorkers()
}

// BeforeRun run script before start worker
Expand Down Expand Up @@ -168,6 +168,7 @@ func NewConsumer(opts ...Option) *Consumer {
stop: make(chan struct{}),
logger: o.logger,
runFunc: o.fn,
metric: o.metric,
}

return w
Expand Down
30 changes: 30 additions & 0 deletions metric.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package queue

import "sync/atomic"

// Metric interface
type Metric interface {
IncBusyWorker()
DecBusyWorker()
BusyWorkers() uint64
}

type metric struct {
busyWorkers uint64
}

func newMetric() Metric {
return &metric{}
}

func (m *metric) IncBusyWorker() {
atomic.AddUint64(&m.busyWorkers, 1)
}

func (m *metric) DecBusyWorker() {
atomic.AddUint64(&m.busyWorkers, ^uint64(0))
}

func (m *metric) BusyWorkers() uint64 {
return atomic.LoadUint64(&m.busyWorkers)
}
10 changes: 10 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ var (
defaultTimeout = 60 * time.Minute
defaultNewLogger = NewLogger()
defaultFn = func(context.Context, QueuedMessage) error { return nil }
defaultMetric = newMetric()
)

// Option for queue system
Expand All @@ -38,6 +39,13 @@ func WithLogger(l Logger) Option {
}
}

// WithMetric set custom Metric
func WithMetric(m Metric) Option {
return func(q *Options) {
q.metric = m
}
}

// WithWorker set custom worker
func WithWorker(w Worker) Option {
return func(q *Options) {
Expand Down Expand Up @@ -66,6 +74,7 @@ type Options struct {
queueSize int
worker Worker
fn func(context.Context, QueuedMessage) error
metric Metric
}

func NewOptions(opts ...Option) *Options {
Expand All @@ -76,6 +85,7 @@ func NewOptions(opts ...Option) *Options {
logger: defaultNewLogger,
worker: nil,
fn: defaultFn,
metric: defaultMetric,
}

// Loop through each option
Expand Down

0 comments on commit 51bba3f

Please sign in to comment.