diff --git a/extensions/impl/kafka/metrics.go b/extensions/impl/kafka/metrics.go new file mode 100644 index 0000000000..58a3a506cf --- /dev/null +++ b/extensions/impl/kafka/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package kafka + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/lf-edge/ekuiper/v2/metrics" +) + +const ( + LblRequest = "req" + LblMessage = "message" + LblException = "exception" +) + +var ( + KafkaCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "kuiper", + Subsystem: "io", + Name: "kafka_count", + Help: "counter of Kafka IO", + }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) + + KafkaHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "kuiper", + Subsystem: "io", + Name: "kafka_duration", + Help: "Historgram of Kafka IO", + }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) +) diff --git a/extensions/impl/kafka/sink.go b/extensions/impl/kafka/sink.go index defbb97836..0e1a9495da 100644 --- a/extensions/impl/kafka/sink.go +++ b/extensions/impl/kafka/sink.go @@ -19,6 +19,7 @@ import ( "encoding/json" "fmt" "strings" + "time" "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/pingcap/failpoint" @@ -26,6 +27,7 @@ import ( "github.com/segmentio/kafka-go/sasl" "github.com/lf-edge/ekuiper/v2/internal/pkg/util" + "github.com/lf-edge/ekuiper/v2/metrics" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/cert" ) @@ -172,15 +174,31 @@ func (k *KafkaSink) Connect(ctx api.StreamContext, sch api.StatusChangeHandler) return err } -func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) error { +func (k *KafkaSink) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) { + defer func() { + if err != nil { + KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + } + }() msgs, err := k.collect(ctx, item) if err != nil { return err } + KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(msgs))) + start := time.Now() + defer func() { + KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) + }() return k.writer.WriteMessages(ctx, msgs...) } -func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) error { +func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) { + defer func() { + if err != nil { + KafkaCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + } + }() allMsgs := make([]kafkago.Message, 0) items.RangeOfTuples(func(index int, tuple api.MessageTuple) bool { msgs, err := k.collect(ctx, tuple) @@ -190,6 +208,12 @@ func (k *KafkaSink) CollectList(ctx api.StreamContext, items api.MessageTupleLis allMsgs = append(allMsgs, msgs...) return true }) + KafkaCounter.WithLabelValues(LblMessage, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Add(float64(len(allMsgs))) + KafkaCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + start := time.Now() + defer func() { + KafkaHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) + }() return k.writer.WriteMessages(ctx, allMsgs...) } diff --git a/extensions/impl/kafka/source.go b/extensions/impl/kafka/source.go index f9cb4985a5..c3b25d706f 100644 --- a/extensions/impl/kafka/source.go +++ b/extensions/impl/kafka/source.go @@ -30,6 +30,7 @@ import ( "github.com/lf-edge/ekuiper/v2/internal/conf" "github.com/lf-edge/ekuiper/v2/internal/pkg/util" + "github.com/lf-edge/ekuiper/v2/metrics" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/cert" "github.com/lf-edge/ekuiper/v2/pkg/timex" @@ -193,6 +194,7 @@ func (k *KafkaSource) Subscribe(ctx api.StreamContext, ingest api.BytesIngest, i ingestError(ctx, err) continue } + KafkaCounter.WithLabelValues(LblMessage, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() ingest(ctx, msg.Value, nil, timex.GetNow()) } } diff --git a/extensions/impl/sql/metrics.go b/extensions/impl/sql/metrics.go new file mode 100644 index 0000000000..17f6296d3d --- /dev/null +++ b/extensions/impl/sql/metrics.go @@ -0,0 +1,43 @@ +// Copyright 2024 EMQ Technologies Co., Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package sql + +import ( + "github.com/prometheus/client_golang/prometheus" + + "github.com/lf-edge/ekuiper/v2/metrics" +) + +const ( + LblRequest = "req" + LblReconn = "reconnect" + LblException = "exception" +) + +var ( + SQLCounter = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "kuiper", + Subsystem: "io", + Name: "sql_count", + Help: "counter of SQL IO", + }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) + + SQLHist = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: "kuiper", + Subsystem: "io", + Name: "sql_duration", + Help: "Historgram of Kafka IO", + }, []string{metrics.LblType, metrics.LblIOType, metrics.LblRuleIDType, metrics.LblOpIDType}) +) diff --git a/extensions/impl/sql/sink.go b/extensions/impl/sql/sink.go index 15add0364e..638066e4a3 100644 --- a/extensions/impl/sql/sink.go +++ b/extensions/impl/sql/sink.go @@ -19,12 +19,14 @@ import ( "fmt" "reflect" "strings" + "time" "github.com/lf-edge/ekuiper/contract/v2/api" "github.com/pingcap/failpoint" "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client" "github.com/lf-edge/ekuiper/v2/internal/pkg/util" + "github.com/lf-edge/ekuiper/v2/metrics" "github.com/lf-edge/ekuiper/v2/pkg/ast" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/connection" @@ -152,6 +154,12 @@ func (s *SQLSinkConnector) Close(ctx api.StreamContext) error { } func (s *SQLSinkConnector) Collect(ctx api.StreamContext, item api.MessageTuple) (err error) { + defer func() { + if err != nil { + SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + } + }() + SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return s.collect(ctx, item.ToMap()) } @@ -175,6 +183,12 @@ func (s *SQLSinkConnector) collect(ctx api.StreamContext, item map[string]any) ( } func (s *SQLSinkConnector) CollectList(ctx api.StreamContext, items api.MessageTupleList) (err error) { + defer func() { + if err != nil { + SQLCounter.WithLabelValues(LblException, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() + } + }() + SQLCounter.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return s.collectList(ctx, items.ToMaps()) } @@ -269,11 +283,13 @@ func (s *SQLSinkConnector) save(ctx api.StreamContext, table string, data map[st func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error { ctx.GetLogger().Debugf(sqlStr) if s.needReconnect { + SQLCounter.WithLabelValues(LblReconn, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() err := s.conn.Reconnect() if err != nil { return errorx.NewIOErr(err.Error()) } } + start := time.Now() r, err := s.conn.GetDB().Exec(sqlStr) failpoint.Inject("dbErr", func() { err = errors.New("dbErr") @@ -282,6 +298,7 @@ func (s *SQLSinkConnector) writeToDB(ctx api.StreamContext, sqlStr string) error s.needReconnect = true return errorx.NewIOErr(err.Error()) } + SQLHist.WithLabelValues(LblRequest, metrics.LblSinkIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) s.needReconnect = false d, err := r.RowsAffected() if err != nil { diff --git a/extensions/impl/sql/source.go b/extensions/impl/sql/source.go index 172b8c3fbb..5a67959bb3 100644 --- a/extensions/impl/sql/source.go +++ b/extensions/impl/sql/source.go @@ -29,6 +29,7 @@ import ( client2 "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/client" "github.com/lf-edge/ekuiper/v2/extensions/impl/sql/sqldatabase/sqlgen" "github.com/lf-edge/ekuiper/v2/internal/pkg/util" + "github.com/lf-edge/ekuiper/v2/metrics" "github.com/lf-edge/ekuiper/v2/pkg/cast" "github.com/lf-edge/ekuiper/v2/pkg/connection" "github.com/lf-edge/ekuiper/v2/pkg/modules" @@ -127,16 +128,19 @@ func (s *SQLSourceConnector) Close(ctx api.StreamContext) error { } func (s *SQLSourceConnector) Pull(ctx api.StreamContext, recvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) { + SQLCounter.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() s.queryData(ctx, recvTime, ingest, ingestError) } func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, ingest api.TupleIngest, ingestError api.ErrorIngest) { logger := ctx.GetLogger() if s.needReconnect { + SQLCounter.WithLabelValues(LblReconn, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() err := s.conn.Reconnect() if err != nil { logger.Errorf("reconnect db error %v", err) ingestError(ctx, err) + SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } } @@ -147,20 +151,23 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, if err != nil { logger.Errorf("Get sql query error %v", err) ingestError(ctx, err) + SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } logger.Debugf("Query the database with %s", query) + start := time.Now() rows, err := s.conn.GetDB().Query(query) failpoint.Inject("QueryErr", func() { err = errors.New("QueryErr") }) + SQLHist.WithLabelValues(LblRequest, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Observe(float64(time.Since(start).Microseconds())) if err != nil { logger.Errorf("query sql error %v", err) s.needReconnect = true ingestError(ctx, err) + SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } else if s.needReconnect { - logger.Infof("reconnect sql success") s.needReconnect = false } cols, _ := rows.Columns() @@ -171,6 +178,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, if err != nil { logger.Errorf("query %v row ColumnTypes error %v", query, err) ingestError(ctx, err) + SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } for rows.Next() { @@ -184,6 +192,7 @@ func (s *SQLSourceConnector) queryData(ctx api.StreamContext, rcvTime time.Time, if err != nil { logger.Errorf("Run sql scan(%s) error %v", query, err) ingestError(ctx, err) + SQLCounter.WithLabelValues(LblException, metrics.LblSourceIO, ctx.GetRuleId(), ctx.GetOpId()).Inc() return } scanIntoMap(data, columns, cols) diff --git a/metrics/metrics.go b/metrics/metrics.go index 90e70b586c..641a301b1a 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -21,9 +21,12 @@ const ( LblStatusType = "status" LblRuleIDType = "rule" LblOpIDType = "op" + LblIOType = "io" LBlRuleRunning = "running" LblRuleStop = "stop" + LblSourceIO = "source" + LblSinkIO = "sink" ) var (