Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create generic statsReporter in metrics package #8084

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/google/uuid"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
eventingmetrics "knative.dev/eventing/pkg/metrics"
kubeclient "knative.dev/pkg/client/injection/kube/client"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
Expand Down Expand Up @@ -146,7 +147,7 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

reporter := filter.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))
reporter := eventingmetrics.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
Expand Down
58 changes: 51 additions & 7 deletions pkg/broker/filter/filter_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
"net/http"
"time"

"go.opencensus.io/resource"
"go.opencensus.io/tag"
"knative.dev/pkg/metrics/metricskey"

eventingmetrics "knative.dev/eventing/pkg/metrics"

opencensusclient "github.com/cloudevents/sdk-go/observability/opencensus/v2/client"
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/binding"
Expand Down Expand Up @@ -75,7 +81,7 @@
// Handler parses Cloud Events, determines if they pass a filter, and sends them to a subscriber.
type Handler struct {
// reporter reports stats of status code and dispatch time
reporter StatsReporter
reporter eventingmetrics.StatsReporter

eventDispatcher *kncloudevents.Dispatcher

Expand All @@ -88,8 +94,45 @@
EventTypeCreator *eventtype.EventTypeAutoHandler
}

type BrokerArgs struct {
ns string
trigger string
broker string
filterType string
requestType string
requestScheme string
}

var (
triggerFilterTypeKey = tag.MustNewKey(eventingmetrics.LabelFilterType)
triggerFilterRequestTypeKey = tag.MustNewKey("filter_request_type")
triggerFilterRequestSchemeKey = tag.MustNewKey(eventingmetrics.LabelEventScheme)
responseCodeKey = tag.MustNewKey(eventingmetrics.LabelResponseCode)

Check failure on line 110 in pkg/broker/filter/filter_handler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

var `responseCodeKey` is unused (unused)
responseCodeClassKey = tag.MustNewKey(eventingmetrics.LabelResponseCodeClass)

Check failure on line 111 in pkg/broker/filter/filter_handler.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

var `responseCodeClassKey` is unused (unused)
)

func (args *BrokerArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) {
ctx := metricskey.WithResource(eventingmetrics.EmptyContext, resource.Resource{
Type: eventingmetrics.ResourceTypeKnativeTrigger,
Labels: map[string]string{
eventingmetrics.LabelNamespaceName: args.ns,
eventingmetrics.LabelBrokerName: args.broker,
eventingmetrics.LabelTriggerName: args.trigger,
},
})
// Note that filterType and filterSource can be empty strings, so they need a special treatment.
ctx, err := tag.New(
ctx,
append(tags,
tag.Insert(triggerFilterTypeKey, eventingmetrics.ValueOrAny(args.filterType)),
tag.Insert(triggerFilterRequestTypeKey, args.requestType),
tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
)...)
return ctx, err
}

// NewHandler creates a new Handler and its associated EventReceiver.
func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) {
func NewHandler(logger *zap.Logger, tokenVerifier *auth.OIDCTokenVerifier, oidcTokenProvider *auth.OIDCTokenProvider, triggerInformer v1.TriggerInformer, brokerInformer v1.BrokerInformer, reporter eventingmetrics.StatsReporter, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, wc func(ctx context.Context) context.Context) (*Handler, error) {
kncloudevents.ConfigureConnectionArgs(&kncloudevents.ConnectionArgs{
MaxIdleConns: defaultMaxIdleConnections,
MaxIdleConnsPerHost: defaultMaxIdleConnectionsPerHost,
Expand Down Expand Up @@ -253,10 +296,11 @@

target := broker.Status.Address

reportArgs := &ReportArgs{
reportArgs := &BrokerArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: brokerName,
filterType: triggerFilterAttribute(trigger.Spec.Filter, "type"),
requestType: "reply_forward",
}

Expand Down Expand Up @@ -311,7 +355,7 @@
}
}

reportArgs := &ReportArgs{
reportArgs := &BrokerArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: brokerName,
Expand Down Expand Up @@ -362,7 +406,7 @@
h.logger.Warn("Failed to delete TTL.", zap.Error(err))
}

reportArgs := &ReportArgs{
reportArgs := &BrokerArgs{
ns: trigger.Namespace,
trigger: trigger.Name,
broker: brokerName,
Expand Down Expand Up @@ -405,7 +449,7 @@
h.send(ctx, writer, utils.PassThroughHeaders(request.Header), target, reportArgs, event, trigger, ttl)
}

func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *ReportArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
func (h *Handler) send(ctx context.Context, writer http.ResponseWriter, headers http.Header, target duckv1.Addressable, reportArgs *BrokerArgs, event *cloudevents.Event, t *eventingv1.Trigger, ttl int32) {
additionalHeaders := headers.Clone()
additionalHeaders.Set(apis.KnNamespaceHeader, t.GetNamespace())

Expand Down Expand Up @@ -538,7 +582,7 @@
return dispatchInfo.ResponseCode, nil
}

func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *ReportArgs) {
func (h *Handler) reportArrivalTime(event *event.Event, reportArgs *BrokerArgs) {
// Record the event processing time. This might be off if the receiver and the filter pods are running in
// different nodes with different clocks.
var arrivalTimeStr string
Expand Down
39 changes: 20 additions & 19 deletions pkg/broker/filter/filter_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"go.uber.org/zap/zaptest"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"knative.dev/eventing/pkg/metrics"
"knative.dev/pkg/apis"
configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/fake"
"knative.dev/pkg/logging"
Expand Down Expand Up @@ -515,14 +516,14 @@ func TestReceiver(t *testing.T) {
if tc.expectedDispatch != fh.requestReceived {
t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived)
}
if tc.expectedEventCount != reporter.eventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported)
if tc.expectedEventCount != reporter.EventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.EventCountReported)
}
if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported)
if tc.expectedEventDispatchTime != reporter.EventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.EventDispatchTimeReported)
}
if tc.expectedEventProcessingTime != reporter.eventProcessingTimeReported {
t.Errorf("Incorrect event processing time reported metric. Expected %v, Actual %v", tc.expectedEventProcessingTime, reporter.eventProcessingTimeReported)
if tc.expectedEventProcessingTime != reporter.EventProcessingTimeReported {
t.Errorf("Incorrect event processing time reported metric. Expected %v, Actual %v", tc.expectedEventProcessingTime, reporter.EventProcessingTimeReported)
}
if tc.expectedResponseEvent != nil {
if tc.expectedResponseEvent.SpecVersion() != event.CloudEventsVersionV1 {
Expand Down Expand Up @@ -703,11 +704,11 @@ func TestReceiver_WithSubscriptionsAPI(t *testing.T) {
if tc.expectedDispatch != fh.requestReceived {
t.Errorf("Incorrect dispatch. Expected %v, Actual %v", tc.expectedDispatch, fh.requestReceived)
}
if tc.expectedEventCount != reporter.eventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.eventCountReported)
if tc.expectedEventCount != reporter.EventCountReported {
t.Errorf("Incorrect event count reported metric. Expected %v, Actual %v", tc.expectedEventCount, reporter.EventCountReported)
}
if tc.expectedEventDispatchTime != reporter.eventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.eventDispatchTimeReported)
if tc.expectedEventDispatchTime != reporter.EventDispatchTimeReported {
t.Errorf("Incorrect event dispatch time reported metric. Expected %v, Actual %v", tc.expectedEventDispatchTime, reporter.EventDispatchTimeReported)
}
// Compare the returned event.
message := cehttp.NewMessageFromHttpResponse(response)
Expand Down Expand Up @@ -741,23 +742,23 @@ func (r *responseWriterWithInvocationsCheck) WriteHeader(statusCode int) {
}

type mockReporter struct {
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
eventCountReported bool
eventDispatchTimeReported bool
eventProcessingTimeReported bool
EventCountReported bool
EventDispatchTimeReported bool
EventProcessingTimeReported bool
}

func (r *mockReporter) ReportEventCount(args *ReportArgs, responseCode int) error {
r.eventCountReported = true
func (r *mockReporter) ReportEventCount(args metrics.MetricArgs, responseCode int) error {
r.EventCountReported = true
return nil
}

func (r *mockReporter) ReportEventDispatchTime(args *ReportArgs, responseCode int, d time.Duration) error {
r.eventDispatchTimeReported = true
func (r *mockReporter) ReportEventDispatchTime(args metrics.MetricArgs, responseCode int, d time.Duration) error {
r.EventDispatchTimeReported = true
return nil
}

func (r *mockReporter) ReportEventProcessingTime(args *ReportArgs, d time.Duration) error {
r.eventProcessingTimeReported = true
func (r *mockReporter) ReportEventProcessingTime(args metrics.MetricArgs, d time.Duration) error {
r.EventProcessingTimeReported = true
return nil
}

Expand Down
52 changes: 46 additions & 6 deletions pkg/broker/filter/stats_reporter_test.go
Leo6Leo marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,65 @@ limitations under the License.
package filter

import (
"context"
"net/http"
"testing"
"time"

"go.opencensus.io/tag"
"knative.dev/pkg/metrics/metricskey"

"go.opencensus.io/resource"
broker "knative.dev/eventing/pkg/broker"
"knative.dev/eventing/pkg/metrics"
"knative.dev/pkg/metrics/metricstest"
_ "knative.dev/pkg/metrics/testing"
)

type TestArgs struct {
ns string
trigger string
broker string
filterType string
requestType string
requestScheme string
container string
uniqueName string
}

func (args *TestArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) {
ctx := metricskey.WithResource(metrics.EmptyContext, resource.Resource{
Type: metrics.ResourceTypeKnativeTrigger,
Labels: map[string]string{
metrics.LabelNamespaceName: args.ns,
metrics.LabelBrokerName: args.broker,
metrics.LabelTriggerName: args.trigger,
},
})
// Note that filterType and filterSource can be empty strings, so they need a special treatment.
ctx, err := tag.New(
ctx,
append(tags,
tag.Insert(broker.ContainerTagKey, args.container),
tag.Insert(broker.UniqueTagKey, args.uniqueName),
tag.Insert(triggerFilterTypeKey, metrics.ValueOrAny(args.filterType)),
tag.Insert(triggerFilterRequestTypeKey, args.requestType),
tag.Insert(triggerFilterRequestSchemeKey, args.requestScheme),
)...)
return ctx, err
}
func TestStatsReporter(t *testing.T) {
setup()
args := &ReportArgs{
args := &TestArgs{
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "testeventtype",
container: "testcontainer",
uniqueName: "testpod",
}

r := NewStatsReporter("testcontainer", "testpod")
r := metrics.NewStatsReporter("testcontainer", "testpod")

wantTags := map[string]string{
metrics.LabelFilterType: "testeventtype",
Expand Down Expand Up @@ -95,18 +133,20 @@ func TestStatsReporter(t *testing.T) {
func TestReporterEmptySourceAndTypeFilter(t *testing.T) {
setup()

args := &ReportArgs{
args := &TestArgs{
ns: "testns",
trigger: "testtrigger",
broker: "testbroker",
filterType: "",
requestScheme: "http",
container: "testcontainer",
uniqueName: "testpod",
}

r := NewStatsReporter("testcontainer", "testpod")
r := metrics.NewStatsReporter("testcontainer", "testpod")

wantTags := map[string]string{
metrics.LabelFilterType: anyValue,
metrics.LabelFilterType: metrics.AnyValue,
metrics.LabelResponseCode: "202",
metrics.LabelResponseCodeClass: "2xx",
broker.LabelContainerName: "testcontainer",
Expand Down Expand Up @@ -156,5 +196,5 @@ func resetMetrics() {
"event_count",
"event_dispatch_latencies",
"event_processing_latencies")
register()
metrics.Register()
}
Loading
Loading