diff --git a/cmd/thanos/query_frontend.go b/cmd/thanos/query_frontend.go index 5fa7cf3c5e..5b6fc0b902 100644 --- a/cmd/thanos/query_frontend.go +++ b/cmd/thanos/query_frontend.go @@ -145,6 +145,8 @@ func registerQueryFrontend(app *extkingpin.App) { cmd.Flag("query-frontend.log-queries-longer-than", "Log queries that are slower than the specified duration. "+ "Set to 0 to disable. Set to < 0 to enable on all queries.").Default("0").DurationVar(&cfg.CortexHandlerConfig.LogQueriesLongerThan) + cmd.Flag("query-frontend.query-stats-enabled", "True to enable query statistics tracking. "+ + "When enabled, a message with some statistics is logged for every query.").Default("false").BoolVar(&cfg.CortexHandlerConfig.QueryStatsEnabled) cmd.Flag("query-frontend.org-id-header", "Deprecation Warning - This flag will be soon deprecated in favor of query-frontend.tenant-header"+ " and both flags cannot be used at the same time. "+ @@ -311,7 +313,7 @@ func runQueryFrontend( return err } - roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper) + roundTripper, err := cortexfrontend.NewDownstreamRoundTripper(cfg.DownstreamURL, downstreamTripper, cfg.CortexHandlerConfig.QueryStatsEnabled) if err != nil { return errors.Wrap(err, "setup downstream roundtripper") } diff --git a/internal/cortex/frontend/downstream_roundtripper.go b/internal/cortex/frontend/downstream_roundtripper.go index 9629d83ecf..9c165f4476 100644 --- a/internal/cortex/frontend/downstream_roundtripper.go +++ b/internal/cortex/frontend/downstream_roundtripper.go @@ -13,17 +13,18 @@ import ( // RoundTripper that forwards requests to downstream URL. type downstreamRoundTripper struct { - downstreamURL *url.URL - transport http.RoundTripper + downstreamURL *url.URL + transport http.RoundTripper + queryStatsEnabled bool } -func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper) (http.RoundTripper, error) { +func NewDownstreamRoundTripper(downstreamURL string, transport http.RoundTripper, queryStatsEnabled bool) (http.RoundTripper, error) { u, err := url.Parse(downstreamURL) if err != nil { return nil, err } - return &downstreamRoundTripper{downstreamURL: u, transport: transport}, nil + return &downstreamRoundTripper{downstreamURL: u, transport: transport, queryStatsEnabled: queryStatsEnabled}, nil } func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) { @@ -36,6 +37,13 @@ func (d downstreamRoundTripper) RoundTrip(r *http.Request) (*http.Response, erro } } + if d.queryStatsEnabled { + // add &stats query param to get thanos-query to add query statistics to log + q := r.URL.Query() + q.Set("stats", "true") + r.URL.RawQuery = q.Encode() + } + r.URL.Scheme = d.downstreamURL.Scheme r.URL.Host = d.downstreamURL.Host r.URL.Path = path.Join(d.downstreamURL.Path, r.URL.Path) diff --git a/internal/cortex/frontend/transport/handler.go b/internal/cortex/frontend/transport/handler.go index 602a54d027..27e7932662 100644 --- a/internal/cortex/frontend/transport/handler.go +++ b/internal/cortex/frontend/transport/handler.go @@ -6,12 +6,14 @@ package transport import ( "bytes" "context" + "encoding/json" "errors" "fmt" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/prometheus/util/stats" "io" "net/http" "net/url" - "strconv" "strings" "syscall" "time" @@ -19,12 +21,9 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/httpgrpc/server" - querier_stats "github.com/thanos-io/thanos/internal/cortex/querier/stats" - "github.com/thanos-io/thanos/internal/cortex/tenant" "github.com/thanos-io/thanos/internal/cortex/util" util_log "github.com/thanos-io/thanos/internal/cortex/util/log" ) @@ -56,10 +55,9 @@ type Handler struct { roundTripper http.RoundTripper // Metrics. - querySeconds *prometheus.CounterVec - querySeries *prometheus.CounterVec - queryBytes *prometheus.CounterVec - activeUsers *util.ActiveUsersCleanupService + querySeconds *prometheus.HistogramVec + querySamplesTotal *prometheus.HistogramVec + activeUsers *util.ActiveUsersCleanupService } // NewHandler creates a new frontend handler. @@ -71,25 +69,21 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge } if cfg.QueryStatsEnabled { - h.querySeconds = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_query_seconds_total", - Help: "Total amount of wall clock time spend processing queries.", + h.querySeconds = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_query_frontend_query_seconds", + Help: "Total amount of wall clock time spend processing queries.", + Buckets: []float64{0.01, 0.1, 0.5, 1, 2, 5, 10, 30, 60, 120, 360}, }, []string{"user"}) - h.querySeries = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_query_fetched_series_total", - Help: "Number of series fetched to execute a query.", - }, []string{"user"}) - - h.queryBytes = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ - Name: "cortex_query_fetched_chunks_bytes_total", - Help: "Size of all chunks fetched to execute a query in bytes.", + h.querySamplesTotal = promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ + Name: "thanos_query_frontend_query_total_fetched_samples", + Help: "Number of samples touched to execute a query.", + Buckets: []float64{1, 100, 1000, 10000, 100000, 1000000, 10000000, 100000000}, }, []string{"user"}) h.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(func(user string) { h.querySeconds.DeleteLabelValues(user) - h.querySeries.DeleteLabelValues(user) - h.queryBytes.DeleteLabelValues(user) + h.querySamplesTotal.DeleteLabelValues(user) }) // If cleaner stops or fail, we will simply not clean the metrics for inactive users. _ = h.activeUsers.StartAsync(context.Background()) @@ -98,25 +92,23 @@ func NewHandler(cfg HandlerConfig, roundTripper http.RoundTripper, log log.Logge return h } +type ResponseDataWithStats struct { + Stats *stats.BuiltinStats `json:"stats"` +} +type ResponseWithStats struct { + Data ResponseDataWithStats `json:"data"` +} + func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { var ( - stats *querier_stats.Stats queryString url.Values ) - // Initialise the stats in the context and make sure it's propagated - // down the request chain. - if f.cfg.QueryStatsEnabled { - var ctx context.Context - stats, ctx = querier_stats.ContextWithEmptyStats(r.Context()) - r = r.WithContext(ctx) - } - defer func() { _ = r.Body.Close() }() - // Buffer the body for later use to track slow queries. + // Buffer the request body for later use to track slow queries. var buf bytes.Buffer r.Body = http.MaxBytesReader(w, r.Body, f.cfg.MaxBodySize) r.Body = io.NopCloser(io.TeeReader(r.Body, &buf)) @@ -135,17 +127,37 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { hs[h] = vs } + w.WriteHeader(resp.StatusCode) + + var respBuf bytes.Buffer if f.cfg.QueryStatsEnabled { - writeServiceTimingHeader(queryResponseTime, hs, stats) + // Buffer the response body for query stat tracking later + resp.Body = io.NopCloser(io.TeeReader(resp.Body, &respBuf)) } - w.WriteHeader(resp.StatusCode) // log copy response body error so that we will know even though success response code returned bytesCopied, err := io.Copy(w, resp.Body) if err != nil && !errors.Is(err, syscall.EPIPE) { level.Error(util_log.WithContext(r.Context(), f.log)).Log("msg", "write response body error", "bytesCopied", bytesCopied, "err", err) } + if f.cfg.QueryStatsEnabled { + // Parse the stats field out of the response body + var statsResponse ResponseWithStats + if err := json.Unmarshal(respBuf.Bytes(), &statsResponse); err == nil { + if statsResponse.Data.Stats != nil { + queryString = f.parseRequestQueryString(r, buf) + f.reportQueryStats(r, queryString, queryResponseTime, statsResponse.Data.Stats) + } else { + // Don't fail the request if the stats are nil, just log a warning + level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", errors.New("stats are nil")) + } + } else { + // Don't fail the request if the stats are nil, just log a warning + level.Warn(util_log.WithContext(r.Context(), f.log)).Log("msg", "error parsing query stats", "err", err) + } + } + // Check whether we should parse the query string. shouldReportSlowQuery := f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan if shouldReportSlowQuery || f.cfg.QueryStatsEnabled { @@ -155,9 +167,6 @@ func (f *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { if shouldReportSlowQuery { f.reportSlowQuery(r, hs, queryString, queryResponseTime) } - if f.cfg.QueryStatsEnabled { - f.reportQueryStats(r, queryString, queryResponseTime, stats) - } } // reportSlowQuery reports slow queries. @@ -194,25 +203,11 @@ func (f *Handler) reportSlowQuery(r *http.Request, responseHeaders http.Header, level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) } -func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *querier_stats.Stats) { - tenantIDs, err := tenant.TenantIDs(r.Context()) - if err != nil { - return - } - userID := tenant.JoinTenantIDs(tenantIDs) - wallTime := stats.LoadWallTime() - numSeries := stats.LoadFetchedSeries() - numBytes := stats.LoadFetchedChunkBytes() +func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, queryResponseTime time.Duration, stats *stats.BuiltinStats) { remoteUser, _, _ := r.BasicAuth() - // Track stats. - f.querySeconds.WithLabelValues(userID).Add(wallTime.Seconds()) - f.querySeries.WithLabelValues(userID).Add(float64(numSeries)) - f.queryBytes.WithLabelValues(userID).Add(float64(numBytes)) - f.activeUsers.UpdateUserTimestamp(userID, time.Now()) - // Log stats. - logMessage := append([]interface{}{ + logMessage := []interface{}{ "msg", "query stats", "component", "query-frontend", "method", r.Method, @@ -220,12 +215,33 @@ func (f *Handler) reportQueryStats(r *http.Request, queryString url.Values, quer "remote_user", remoteUser, "remote_addr", r.RemoteAddr, "response_time", queryResponseTime, - "query_wall_time_seconds", wallTime.Seconds(), - "fetched_series_count", numSeries, - "fetched_chunks_bytes", numBytes, - }, formatQueryString(queryString)...) + "query_timings_preparation_time", stats.Timings.QueryPreparationTime, + "query_timings_eval_total_time", stats.Timings.EvalTotalTime, + "query_timings_exec_total_time", stats.Timings.ExecTotalTime, + "query_timings_exec_queue_time", stats.Timings.ExecQueueTime, + "query_timings_inner_eval_time", stats.Timings.InnerEvalTime, + "query_timings_result_sort_time", stats.Timings.ResultSortTime, + } + if stats.Samples != nil { + samples := stats.Samples + + logMessage = append(logMessage, []interface{}{ + "total_queryable_samples", samples.TotalQueryableSamples, + "peak_samples", samples.PeakSamples, + }...) + } + + logMessage = append(logMessage, formatQueryString(queryString)...) level.Info(util_log.WithContext(r.Context(), f.log)).Log(logMessage...) + + // Record metrics. + if f.querySeconds != nil { + f.querySeconds.WithLabelValues(remoteUser).Observe(queryResponseTime.Seconds()) + } + if f.querySamplesTotal != nil && stats.Samples != nil { + f.querySamplesTotal.WithLabelValues(remoteUser).Observe(float64(stats.Samples.TotalQueryableSamples)) + } } func (f *Handler) parseRequestQueryString(r *http.Request, bodyBuf bytes.Buffer) url.Values { @@ -262,17 +278,3 @@ func writeError(w http.ResponseWriter, err error) { } server.WriteError(w, err) } - -func writeServiceTimingHeader(queryResponseTime time.Duration, headers http.Header, stats *querier_stats.Stats) { - if stats != nil { - parts := make([]string, 0) - parts = append(parts, statsValue("querier_wall_time", stats.LoadWallTime())) - parts = append(parts, statsValue("response_time", queryResponseTime)) - headers.Set(ServiceTimingHeaderName, strings.Join(parts, ", ")) - } -} - -func statsValue(name string, d time.Duration) string { - durationInMs := strconv.FormatFloat(float64(d)/float64(time.Millisecond), 'f', -1, 64) - return name + ";dur=" + durationInMs -}