Skip to content

Commit

Permalink
feat: support dump metrics (#3488)
Browse files Browse the repository at this point in the history
Signed-off-by: Song Gao <[email protected]>
  • Loading branch information
Yisaer authored Jan 7, 2025
1 parent 91fe83d commit 0c6f4a0
Show file tree
Hide file tree
Showing 12 changed files with 479 additions and 9 deletions.
3 changes: 3 additions & 0 deletions etc/kuiper.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ basic:
backoffMaxElapsedDuration: 3m
# enableResourceProfiling indicates whether to enable resource profiling for eKuiper. If it is enabled, the cpu usage of the rule will be recorded.
enableResourceProfiling: false
metricsDumpConfig:
enable: false
retainedDuration: 6h

# The default options for all rules. Each rule can override this setting by defining its own option
rule:
Expand Down
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ require (
github.com/uber/athenadriver v1.1.15
github.com/ugorji/go/codec v1.2.12
github.com/urfave/cli v1.22.15
github.com/utahta/go-cronowriter v1.2.0
github.com/valyala/fastjson v1.6.4
github.com/vertica/vertica-sql-go v1.3.3
github.com/xo/dburl v0.23.2
Expand Down Expand Up @@ -299,8 +300,8 @@ require (
github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.59.1 // indirect
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.59.1
github.com/prometheus/procfs v0.15.1 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1107,9 +1107,11 @@ github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjS
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2ttpEmkA4aQ3j4u9dStX2t4M8UM6qqNsG8=
github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible/go.mod h1:ZQnN8lSECaebrkQytbHj4xNgtg8CR7RYXnPok8e0EHA=
github.com/lestrrat-go/strftime v0.0.0-20180220091553-9948d03c6207/go.mod h1:RMlXygAD3c48Psmr06d2G75L4E4xxzxkIe/+ppX9eAU=
github.com/lestrrat-go/strftime v0.0.0-20180821113735-8b31f9c59b0f/go.mod h1:RMlXygAD3c48Psmr06d2G75L4E4xxzxkIe/+ppX9eAU=
github.com/lestrrat-go/strftime v1.1.0 h1:gMESpZy44/4pXLO/m+sL0yBd1W6LjgjrrD4a68Gapyg=
github.com/lestrrat-go/strftime v1.1.0/go.mod h1:uzeIB52CeUJenCo1syghlugshMysrqUT51HlxphXVeI=
github.com/lestrrat/go-envload v0.0.0-20180220120943-6ed08b54a570/go.mod h1:BLt8L9ld7wVsvEWQbuLrUZnCMnUmLZ+CGDzKtclrTlE=
github.com/lf-edge/ekuiper/contract/v2 v2.0.0 h1:MJB4ZDG099rbHRYxry0X1Iu79kgMVencHd7YX80N1dw=
github.com/lf-edge/ekuiper/contract/v2 v2.0.0/go.mod h1:Y8QwH4jJHqpx3id9mCp7KYS3gKmHtE5NKYeZ3ZK7C2E=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
Expand Down Expand Up @@ -1304,6 +1306,7 @@ github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.1-0.20180311214515-816c9085562c/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -1442,6 +1445,7 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/substrait-io/substrait-go v0.4.2/go.mod h1:qhpnLmrcvAnlZsUyPXZRqldiHapPTXC3t7xFgDi3aQg=
github.com/tebeka/strftime v0.0.0-20140926081919-3f9c7761e312/go.mod h1:o6CrSUtupq/A5hylbvAsdydn0d5yokJExs8VVdx4wwI=
github.com/tebeka/strftime v0.1.3/go.mod h1:7wJm3dZlpr4l/oVK0t1HYIc4rMzQ2XJlOMIUJUJH6XQ=
github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE=
github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU=
Expand Down Expand Up @@ -1472,6 +1476,8 @@ github.com/ugorji/go/codec v1.2.12 h1:9LC83zGrHhuUA9l16C9AHXAqEV/2wBQ4nkvumAE65E
github.com/ugorji/go/codec v1.2.12/go.mod h1:UNopzCgEMSXjBc6AOMqYvWC1ktqTAfzJZUZgYf6w6lg=
github.com/urfave/cli v1.22.15 h1:nuqt+pdC/KqswQKhETJjo7pvn/k4xMUxgW6liI7XpnM=
github.com/urfave/cli v1.22.15/go.mod h1:wSan1hmo5zeyLGBjRJbzRTNk8gwoYa2B9n4q9dmRIc0=
github.com/utahta/go-cronowriter v1.2.0 h1:XTngg0k0awvVdSzTtnw4JjlOA+FMCr/CmNifi1FHzak=
github.com/utahta/go-cronowriter v1.2.0/go.mod h1:g77x79wGOtCblBDyCRjhlEDt2X2wcCjG9JL/+KL0oso=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/vertica/vertica-sql-go v1.3.3 h1:fL+FKEAEy5ONmsvya2WH5T8bhkvY27y/Ik3ReR2T+Qw=
Expand Down
11 changes: 11 additions & 0 deletions internal/conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ type KuiperConf struct {
AesKey string `yaml:"aesKey"`
GracefulShutdownTimeout cast.DurationConf `yaml:"gracefulShutdownTimeout"`
EnableResourceProfiling bool `yaml:"enableResourceProfiling"`
MetricsDumpConfig MetricsDumpConfig `yaml:"metricsDumpConfig"`
}
Rule def.RuleOption
Sink *SinkConf
Expand Down Expand Up @@ -231,6 +232,11 @@ type KuiperConf struct {
AesKey []byte
}

type MetricsDumpConfig struct {
Enable bool `yaml:"enable"`
RetainedDuration time.Duration `yaml:"retainedDuration"`
}

type OpenTelemetry struct {
ServiceName string `yaml:"serviceName"`
EnableRemoteCollector bool `yaml:"enableRemoteCollector"`
Expand Down Expand Up @@ -422,6 +428,11 @@ func InitConf() {
if Config.Source == nil {
Config.Source = &SourceConf{}
}

if Config.Basic.MetricsDumpConfig.RetainedDuration < 1 {
Config.Basic.MetricsDumpConfig.RetainedDuration = 6 * time.Hour
}

_ = Config.Source.Validate()
if Config.Sink == nil {
Config.Sink = &SinkConf{}
Expand Down
23 changes: 23 additions & 0 deletions internal/conf/path.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
dataDir = "data"
logDir = "log"
pluginsDir = "plugins"
metricsDir = "metrics"
KuiperBaseKey = "KuiperBaseKey"
)

Expand All @@ -66,6 +67,14 @@ func GetLogLoc() (string, error) {
return GetLoc(logDir)
}

func GetMetricsLoc() (string, error) {
logPath, err := GetLogLoc()
if err != nil {
return "", err
}
return filepath.Join(logPath, metricsDir), nil
}

func GetDataLoc() (s string, err error) {
defer func() {
failpoint.Inject("GetDataLocErr", func() {
Expand Down Expand Up @@ -177,3 +186,17 @@ func ProcessPath(p string) (string, error) {
}
return abs, nil
}

func InitMetricsFolder() error {
mPath, err := GetLoc(metricsDir)
if err != nil {
return err
}
if _, err := os.Stat(mPath); os.IsNotExist(err) {
err := os.Mkdir(mPath, 0o755)
if err != nil {
return err
}
}
return nil
}
70 changes: 70 additions & 0 deletions internal/server/metrics_init.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2024 EMQ Technologies Co., Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"fmt"
"net/http"
"os"
"path/filepath"
"strconv"
"time"

"github.com/lf-edge/ekuiper/v2/metrics"
)

func dumpMetricsHandler(w http.ResponseWriter, r *http.Request) {
startTime, endTime := extractStartEndTime(r)
zipFilePath, err := metrics.GetMetricsZipFile(startTime, endTime)
if err != nil {
handleError(w, err, "", logger)
return
}
defer os.Remove(zipFilePath)
downloadHandler(zipFilePath, w, r)
}

func extractStartEndTime(r *http.Request) (time.Time, time.Time) {
st := r.URL.Query().Get("startTime")
et := r.URL.Query().Get("endTime")
sti, err1 := strconv.ParseInt(st, 10, 64)
eti, err2 := strconv.ParseInt(et, 10, 64)
if err1 != nil || err2 != nil {
return time.Now().Add(-1 * time.Hour), time.Now()
}
return time.Unix(sti, 0), time.Unix(eti, 0)
}

func downloadHandler(targetFilePath string, w http.ResponseWriter, r *http.Request) {
if _, err := os.Stat(targetFilePath); os.IsNotExist(err) {
http.Error(w, "File not found", http.StatusNotFound)
return
}
file, err := os.Open(targetFilePath)
if err != nil {
http.Error(w, "Failed to open file", http.StatusInternalServerError)
return
}
defer file.Close()
fileInfo, err := file.Stat()
if err != nil {
http.Error(w, "Failed to get file info", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=\"%s\"", filepath.Base(targetFilePath)))
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Length", fmt.Sprintf("%d", fileInfo.Size()))
http.ServeContent(w, r, fileInfo.Name(), fileInfo.ModTime(), file)
}
3 changes: 3 additions & 0 deletions internal/server/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,9 @@ func createRestServer(ip string, port int, needToken bool) *http.Server {
r.HandleFunc("/trace/{id}", getTraceByID).Methods(http.MethodGet)
r.HandleFunc("/trace/rule/{ruleID}", getTraceIDByRuleID).Methods(http.MethodGet)
r.HandleFunc("/tracer", tracerHandler).Methods(http.MethodPost)

// dump metrics
r.HandleFunc("/metrics/dump", dumpMetricsHandler).Methods(http.MethodGet)
// Register extended routes
for k, v := range components {
logger.Infof("register rest endpoint for component %s", k)
Expand Down
2 changes: 2 additions & 0 deletions internal/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/lf-edge/ekuiper/v2/internal/processor"
"github.com/lf-edge/ekuiper/v2/internal/server/bump"
"github.com/lf-edge/ekuiper/v2/internal/topo/rule"
"github.com/lf-edge/ekuiper/v2/metrics"
"github.com/lf-edge/ekuiper/v2/pkg/cast"
"github.com/lf-edge/ekuiper/v2/pkg/connection"
"github.com/lf-edge/ekuiper/v2/pkg/modules"
Expand Down Expand Up @@ -240,6 +241,7 @@ func StartUp(Version string) {
}
}
go runScheduleRuleChecker(serverCtx)
metrics.InitMetricsDumpJob(serverCtx)
async.InitManager()

// Start rest service
Expand Down
2 changes: 1 addition & 1 deletion internal/topo/node/metric/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type PrometheusMetrics struct {

func newPrometheusMetrics() *PrometheusMetrics {
var (
labelNames = []string{"rule", "type", "op", "instance"}
labelNames = []string{"rule", "type", "op", "op_instance"}
prefixes = []string{"kuiper_source", "kuiper_op", "kuiper_sink"}
)
var vecs []*MetricGroup
Expand Down
Loading

0 comments on commit 0c6f4a0

Please sign in to comment.