Skip to content

Commit

Permalink
feat: support io metrics (#3443)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Jan 6, 2025
1 parent d41a893 commit 91fe83d
Show file tree
Hide file tree
Showing 7 changed files with 144 additions and 3 deletions.
43 changes: 43 additions & 0 deletions extensions/impl/kafka/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package kafka

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/lf-edge/ekuiper/v2/metrics"
)

const (
LblRequest = "req"
LblMessage = "message"
LblException = "exception"
)

var (
KafkaCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "kafka_count",
Help: "counter of Kafka IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})

KafkaHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "kafka_duration",
Help: "Historgram of Kafka IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
)
28 changes: 26 additions & 2 deletions extensions/impl/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@ import (
"encoding/json"
"fmt"
"strings"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/pingcap/failpoint"
kafkago "github.com/segmentio/kafka-go"
"github.com/segmentio/kafka-go/sasl"

"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/cert"
)
Expand Down Expand Up @@ -172,15 +174,31 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler)
return err
}

func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) error {
func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
msgs, err := k.collect(ctx, item)
if err != nil {
return err
}
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs)))
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
}()
return k.writer.WriteMessages(ctx, msgs...)
}

func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error {
func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) {
defer func() {
if err != nil {
KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
allMsgs := make([]kafkago.Message, 0)
items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool {
msgs, err := k.collect(ctx, tuple)
Expand All @@ -190,6 +208,12 @@ func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleLis
allMsgs = append(allMsgs, msgs...)
return true
})
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs)))
KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
start := time.Now()
defer func() {
KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
}()
return k.writer.WriteMessages(ctx, allMsgs...)
}

Expand Down
2 changes: 2 additions & 0 deletions extensions/impl/kafka/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/lf-edge/ekuiper/v2/internal/conf"
"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/cert"
"github.com/lf-edge/ekuiper/v2/pkg/timex"
Expand Down Expand Up @@ -193,6 +194,7 @@ func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, i
ingestError(ctx, err)
continue
}
KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
ingest(ctx, msg.Value, nil, timex.GetNow())
}
}
Expand Down
43 changes: 43 additions & 0 deletions extensions/impl/sql/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package sql

import (
"github.com/prometheus/client_golang/prometheus"

"github.com/lf-edge/ekuiper/v2/metrics"
)

const (
LblRequest = "req"
LblReconn = "reconnect"
LblException = "exception"
)

var (
SQLCounter = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "sql_count",
Help: "counter of SQL IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})

SQLHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Namespace: "kuiper",
Subsystem: "io",
Name: "sql_duration",
Help: "Historgram of Kafka IO",
}, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType})
)
17 changes: 17 additions & 0 deletions extensions/impl/sql/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ import (
"fmt"
"reflect"
"strings"
"time"

"github.com/lf-edge/ekuiper/contract/v2/api"
"github.com/pingcap/failpoint"

"github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client"
"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/ast"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
Expand Down Expand Up @@ -152,6 +154,12 @@ func (s *SQLSinkConnector) Close(ctx api.StreamContext) error {
}

func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) {
defer func() {
if err != nil {
SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return s.collect(ctx, item.ToMap())
}

Expand All @@ -175,6 +183,12 @@ func (s *SQLSinkConnector) collect(ctx api.StreamContext, item map[string]any) (
}

func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) {
defer func() {
if err != nil {
SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
}
}()
SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return s.collectList(ctx, items.ToMaps())
}

Expand Down Expand Up @@ -269,11 +283,13 @@ func (s *SQLSinkConnector) save(ctx api.StreamContext, table string, data map[st
func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error {
ctx.GetLogger().Debugf(sqlStr)
if s.needReconnect {
SQLCounter.WithLabelValues(LblReconn, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
err := s.conn.Reconnect()
if err != nil {
return errorx.NewIOErr(err.Error())
}
}
start := time.Now()
r, err := s.conn.GetDB().Exec(sqlStr)
failpoint.Inject("dbErr", func() {
err = errors.New("dbErr")
Expand All @@ -282,6 +298,7 @@ func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error
s.needReconnect = true
return errorx.NewIOErr(err.Error())
}
SQLHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
s.needReconnect = false
d, err := r.RowsAffected()
if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion extensions/impl/sql/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
client2 "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client"
"github.com/lf-edge/ekuiper/v2/extensions/impl/sql/sqldatabase/sqlgen"
"github.com/lf-edge/ekuiper/v2/internal/pkg/util"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
Expand Down Expand Up @@ -127,16 +128,19 @@ func (s *SQLSourceConnector) Close(ctx api.StreamContext) error {
}

func (s *SQLSourceConnector) Pull(ctx api.StreamContext, recvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
SQLCounter.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
s.queryData(ctx, recvTime, ingest, ingestError)
}

func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) {
logger := ctx.GetLogger()
if s.needReconnect {
SQLCounter.WithLabelValues(LblReconn, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
err := s.conn.Reconnect()
if err != nil {
logger.Errorf("reconnect db error %v", err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
}
Expand All @@ -147,20 +151,23 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
if err != nil {
logger.Errorf("Get sql query error %v", err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
logger.Debugf("Query the database with %s", query)
start := time.Now()
rows, err := s.conn.GetDB().Query(query)
failpoint.Inject("QueryErr", func() {
err = errors.New("QueryErr")
})
SQLHist.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds()))
if err != nil {
logger.Errorf("query sql error %v", err)
s.needReconnect = true
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
} else if s.needReconnect {
logger.Infof("reconnect sql success")
s.needReconnect = false
}
cols, _ := rows.Columns()
Expand All @@ -171,6 +178,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
if err != nil {
logger.Errorf("query %v row ColumnTypes error %v", query, err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
for rows.Next() {
Expand All @@ -184,6 +192,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time,
if err != nil {
logger.Errorf("Run sql scan(%s) error %v", query, err)
ingestError(ctx, err)
SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc()
return
}
scanIntoMap(data, columns, cols)
Expand Down
3 changes: 3 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@ const (
LblStatusType = "status"
LblRuleIDType = "rule"
LblOpIDType = "op"
LblIOType = "io"

LBlRuleRunning = "running"
LblRuleStop = "stop"
LblSourceIO = "source"
LblSinkIO = "sink"
)

var (
Expand Down

0 comments on commit 91fe83d

Please sign in to comment.