From 51bba3f067af1cce82d80e86bb8fe3eb86cd4a6f Mon Sep 17 00:00:00 2001 From: Bo-Yi Wu Date: Fri, 24 Dec 2021 19:35:18 +0800 Subject: [PATCH] chore(metric): Add metric interface (#40) --- consumer.go | 21 +++++++++++---------- metric.go | 30 ++++++++++++++++++++++++++++++ options.go | 10 ++++++++++ 3 files changed, 51 insertions(+), 10 deletions(-) create mode 100644 metric.go diff --git a/consumer.go b/consumer.go index a9d6a54..eca2af7 100644 --- a/consumer.go +++ b/consumer.go @@ -15,25 +15,25 @@ var errMaxCapacity = errors.New("max capacity reached") // Worker for simple queue using channel type Consumer struct { - taskQueue chan QueuedMessage - runFunc func(context.Context, QueuedMessage) error - stop chan struct{} - logger Logger - stopOnce sync.Once - stopFlag int32 - busyWorkers uint64 + taskQueue chan QueuedMessage + runFunc func(context.Context, QueuedMessage) error + stop chan struct{} + logger Logger + stopOnce sync.Once + stopFlag int32 + metric Metric } func (s *Consumer) incBusyWorker() { - atomic.AddUint64(&s.busyWorkers, 1) + s.metric.IncBusyWorker() } func (s *Consumer) decBusyWorker() { - atomic.AddUint64(&s.busyWorkers, ^uint64(0)) + s.metric.DecBusyWorker() } func (s *Consumer) BusyWorkers() uint64 { - return atomic.LoadUint64(&s.busyWorkers) + return s.metric.BusyWorkers() } // BeforeRun run script before start worker @@ -168,6 +168,7 @@ func NewConsumer(opts ...Option) *Consumer { stop: make(chan struct{}), logger: o.logger, runFunc: o.fn, + metric: o.metric, } return w diff --git a/metric.go b/metric.go new file mode 100644 index 0000000..04d50f8 --- /dev/null +++ b/metric.go @@ -0,0 +1,30 @@ +package queue + +import "sync/atomic" + +// Metric interface +type Metric interface { + IncBusyWorker() + DecBusyWorker() + BusyWorkers() uint64 +} + +type metric struct { + busyWorkers uint64 +} + +func newMetric() Metric { + return &metric{} +} + +func (m *metric) IncBusyWorker() { + atomic.AddUint64(&m.busyWorkers, 1) +} + +func (m *metric) DecBusyWorker() { + atomic.AddUint64(&m.busyWorkers, ^uint64(0)) +} + +func (m *metric) BusyWorkers() uint64 { + return atomic.LoadUint64(&m.busyWorkers) +} diff --git a/options.go b/options.go index d78abbb..7821202 100644 --- a/options.go +++ b/options.go @@ -12,6 +12,7 @@ var ( defaultTimeout = 60 * time.Minute defaultNewLogger = NewLogger() defaultFn = func(context.Context, QueuedMessage) error { return nil } + defaultMetric = newMetric() ) // Option for queue system @@ -38,6 +39,13 @@ func WithLogger(l Logger) Option { } } +// WithMetric set custom Metric +func WithMetric(m Metric) Option { + return func(q *Options) { + q.metric = m + } +} + // WithWorker set custom worker func WithWorker(w Worker) Option { return func(q *Options) { @@ -66,6 +74,7 @@ type Options struct { queueSize int worker Worker fn func(context.Context, QueuedMessage) error + metric Metric } func NewOptions(opts ...Option) *Options { @@ -76,6 +85,7 @@ func NewOptions(opts ...Option) *Options { logger: defaultNewLogger, worker: nil, fn: defaultFn, + metric: defaultMetric, } // Loop through each option