diff --git a/pkg/clioptions/iooptions/io_options.go b/pkg/clioptions/iooptions/io_options.go index d241aed612d9..bb722236829e 100644 --- a/pkg/clioptions/iooptions/io_options.go +++ b/pkg/clioptions/iooptions/io_options.go @@ -1,8 +1,10 @@ package iooptions import ( + "fmt" "io" "os" + "path" "github.com/spf13/pflag" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -36,6 +38,11 @@ func (o *OutputFlags) ConfigureIOStreams(streams genericclioptions.IOStreams, st return doNothing, nil } + dir := path.Dir(o.OutFile) + if err := os.MkdirAll(dir, os.ModePerm); err != nil { + return doNothing, fmt.Errorf("failed to create parentdir %q: %w", dir, err) + } + f, err := os.OpenFile(o.OutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640) if err != nil { return doNothing, err diff --git a/pkg/cmd/openshift-tests/dev/dev.go b/pkg/cmd/openshift-tests/dev/dev.go index 1323f4a9cc7c..a28148a13894 100644 --- a/pkg/cmd/openshift-tests/dev/dev.go +++ b/pkg/cmd/openshift-tests/dev/dev.go @@ -160,7 +160,7 @@ a running cluster. logrus.Infof("loaded %d intervals", len(intervals)) logrus.Info("running tests") - junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals) + junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals, nil) for _, junit := range junits { if junit.FailureOutput != nil { logrus.Errorf("FAIL: %s", junit.Name) diff --git a/pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go b/pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go index 8d926c73d085..04ebf8135675 100644 --- a/pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go +++ b/pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go @@ -19,12 +19,11 @@ import ( "github.com/spf13/pflag" + "github.com/openshift/origin/pkg/defaultmonitortests" + "github.com/openshift/origin/pkg/monitor" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericclioptions" "k8s.io/kubectl/pkg/util/templates" - - "github.com/openshift/origin/pkg/defaultmonitortests" - "github.com/openshift/origin/pkg/monitor" ) type RunMonitorFlags struct { diff --git a/pkg/cmd/openshift-tests/run-disruption/disruption.go b/pkg/cmd/openshift-tests/run-disruption/disruption.go index 3e96b6d949fb..9bd9b1cda296 100644 --- a/pkg/cmd/openshift-tests/run-disruption/disruption.go +++ b/pkg/cmd/openshift-tests/run-disruption/disruption.go @@ -6,46 +6,51 @@ import ( "io" "os" "os/signal" - "path/filepath" + "sync" "syscall" - "time" - "github.com/openshift/origin/pkg/clioptions/clusterinfo" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" - monitorserialization "github.com/openshift/origin/pkg/monitor/serialization" - - "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - "k8s.io/klog/v2" - "k8s.io/kubectl/pkg/util/templates" + "k8s.io/apimachinery/pkg/fields" + "github.com/openshift/origin/pkg/clioptions/iooptions" "github.com/openshift/origin/pkg/disruption/backend" + disruptionci "github.com/openshift/origin/pkg/disruption/ci" "github.com/openshift/origin/pkg/monitor" - "github.com/openshift/origin/pkg/monitor/apiserveravailability" - "github.com/openshift/origin/pkg/monitor/monitorapi" "github.com/openshift/origin/test/extended/util/disruption/controlplane" "github.com/spf13/cobra" + "github.com/spf13/pflag" + corev1 "k8s.io/api/core/v1" + apimachinerywatch "k8s.io/apimachinery/pkg/watch" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/watch" + "k8s.io/kubectl/pkg/util/templates" ) -// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor -type RunAPIDisruptionMonitorOptions struct { - Out, ErrOut io.Writer +type RunAPIDisruptionMonitorFlags struct { + ConfigFlags *genericclioptions.ConfigFlags + OutputFlags *iooptions.OutputFlags + + ArtifactDir string + LoadBalancerType string + StopConfigMapName string - ArtifactDir string - LoadBalancerType string - ExtraMessage string + genericclioptions.IOStreams } -func NewRunInClusterDisruptionMonitorOptions(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorOptions { - return &RunAPIDisruptionMonitorOptions{ - Out: ioStreams.Out, - ErrOut: ioStreams.ErrOut, +func NewRunInClusterDisruptionMonitorFlags(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorFlags { + return &RunAPIDisruptionMonitorFlags{ + ConfigFlags: genericclioptions.NewConfigFlags(false), + OutputFlags: iooptions.NewOutputOptions(), + IOStreams: ioStreams, } } func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStreams) *cobra.Command { - disruptionOpt := NewRunInClusterDisruptionMonitorOptions(ioStreams) + f := NewRunInClusterDisruptionMonitorFlags(ioStreams) cmd := &cobra.Command{ Use: "run-disruption", Short: "Run API server disruption monitor", @@ -56,122 +61,183 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea SilenceUsage: true, SilenceErrors: true, RunE: func(cmd *cobra.Command, args []string) error { - return disruptionOpt.Run() + ctx, cancelFn := context.WithCancel(context.Background()) + defer cancelFn() + abortCh := make(chan os.Signal, 2) + go func() { + <-abortCh + fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n") + cancelFn() + + sig := <-abortCh + fmt.Fprintf(f.ErrOut, "Interrupted twice, exiting (%s)\n", sig) + switch sig { + case syscall.SIGINT: + os.Exit(130) + default: + os.Exit(0) + } + }() + signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM) + + if err := f.Validate(); err != nil { + return err + } + + o, err := f.ToOptions() + if err != nil { + return err + } + + return o.Run(ctx) }, } - cmd.Flags().StringVar(&disruptionOpt.ArtifactDir, - "artifact-dir", disruptionOpt.ArtifactDir, - "The directory where monitor events will be stored.") - cmd.Flags().StringVar(&disruptionOpt.LoadBalancerType, - "lb-type", disruptionOpt.LoadBalancerType, - "Set load balancer type, available options: internal-lb, service-network, external-lb (default)") - cmd.Flags().StringVar(&disruptionOpt.ExtraMessage, - "extra-message", disruptionOpt.ExtraMessage, - "Add custom label to disruption event message") + + f.AddFlags(cmd.Flags()) + return cmd } -func (opt *RunAPIDisruptionMonitorOptions) Run() error { - restConfig, err := clusterinfo.GetMonitorRESTConfig() - if err != nil { - return err - } +func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) { + flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)") + flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.") - lb := backend.ParseStringToLoadBalancerType(opt.LoadBalancerType) + f.ConfigFlags.AddFlags(flags) + f.OutputFlags.BindFlags(flags) +} - ctx, cancelFn := context.WithCancel(context.Background()) - defer cancelFn() - abortCh := make(chan os.Signal, 2) - go func() { - <-abortCh - fmt.Fprintf(opt.ErrOut, "Interrupted, terminating\n") - // Give some time to store intervals on disk - time.Sleep(5 * time.Second) - cancelFn() - sig := <-abortCh - fmt.Fprintf(opt.ErrOut, "Interrupted twice, exiting (%s)\n", sig) - switch sig { - case syscall.SIGINT: - os.Exit(130) - default: - os.Exit(0) - } - }() - signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM) +func (f *RunAPIDisruptionMonitorFlags) SetIOStreams(streams genericclioptions.IOStreams) { + f.IOStreams = streams +} - recorder, err := StartAPIAvailability(ctx, restConfig, lb) - if err != nil { - return err +func (f *RunAPIDisruptionMonitorFlags) Validate() error { + if len(f.OutputFlags.OutFile) == 0 { + return fmt.Errorf("output-file must be specified") + } + if len(f.StopConfigMapName) == 0 { + return fmt.Errorf("stop-configmap must be specified") } - go func() { - ticker := time.NewTicker(100 * time.Millisecond) - defer ticker.Stop() - var last time.Time - done := false - for !done { - select { - case <-ticker.C: - case <-ctx.Done(): - done = true - } - events := recorder.Intervals(last, time.Time{}) - if len(events) > 0 { - for _, event := range events { - if !event.From.Equal(event.To) { - continue - } - fmt.Fprintln(opt.Out, event.String()) - } - last = events[len(events)-1].From - } - } - }() - - <-ctx.Done() + return nil +} - // Store intervals to artifact directory - intervals := recorder.Intervals(time.Time{}, time.Time{}) - if len(opt.ExtraMessage) > 0 { - fmt.Fprintf(opt.Out, "\nAppending %s to recorded event message\n", opt.ExtraMessage) - for i, event := range intervals { - intervals[i].Message.HumanMessage = fmt.Sprintf("%s user-provided-message=%s", event.Message.HumanMessage, opt.ExtraMessage) - } +func (f *RunAPIDisruptionMonitorFlags) ToOptions() (*RunAPIDisruptionMonitorOptions, error) { + originalOutStream := f.IOStreams.Out + closeFn, err := f.OutputFlags.ConfigureIOStreams(f.IOStreams, f) + if err != nil { + return nil, err } - eventDir := filepath.Join(opt.ArtifactDir, monitorapi.EventDir) - if err := os.MkdirAll(eventDir, os.ModePerm); err != nil { - fmt.Printf("Failed to create monitor-events directory, err: %v\n", err) - return err + namespace, _, err := f.ConfigFlags.ToRawKubeConfigLoader().Namespace() + if err != nil { + return nil, err + } + if len(namespace) == 0 { + return nil, fmt.Errorf("namespace must be specified") } - timeSuffix := fmt.Sprintf("_%s", time.Now().UTC().Format("20060102-150405")) - if err := monitorserialization.EventsToFile(filepath.Join(eventDir, fmt.Sprintf("e2e-events%s.json", timeSuffix)), intervals); err != nil { - fmt.Printf("Failed to write event data, err: %v\n", err) - return err + restConfig, err := f.ConfigFlags.ToRESTConfig() + if err != nil { + return nil, err + } + kubeClient, err := kubernetes.NewForConfig(restConfig) + if err != nil { + return nil, err } - fmt.Fprintf(opt.Out, "\nEvent data written, exiting\n") - return nil + return &RunAPIDisruptionMonitorOptions{ + KubeClient: kubeClient, + KubeClientConfig: restConfig, + OutputFile: f.OutputFlags.OutFile, + LoadBalancerType: f.LoadBalancerType, + StopConfigMapName: f.StopConfigMapName, + Namespace: namespace, + CloseFn: closeFn, + OriginalOutFile: originalOutStream, + IOStreams: f.IOStreams, + }, nil } -// StartAPIAvailability monitors just the cluster availability -func StartAPIAvailability(ctx context.Context, restConfig *rest.Config, lb backend.LoadBalancerType) (monitorapi.Recorder, error) { - recorder := monitor.NewRecorder() +// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor +type RunAPIDisruptionMonitorOptions struct { + KubeClient kubernetes.Interface + KubeClientConfig *rest.Config + OutputFile string + LoadBalancerType string + StopConfigMapName string + Namespace string + + OriginalOutFile io.Writer + CloseFn iooptions.CloseFunc + genericclioptions.IOStreams +} - client, err := kubernetes.NewForConfig(restConfig) - if err != nil { - return nil, err +func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error { + ctx, cancelFn := context.WithCancel(ctx) + defer cancelFn() + + fmt.Fprintf(o.Out, "Starting up.") + + startingContent, err := os.ReadFile(o.OutputFile) + if err != nil && !os.IsNotExist(err) { + return err } - if err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, restConfig, lb); err != nil { - return nil, err + if len(startingContent) > 0 { + // print starting content to the log so that we can simply scrape the log to find all entries at the end. + o.OriginalOutFile.Write(startingContent) } - // read the state of the cluster apiserver client access issues *before* any test (like upgrade) begins - intervals, err := apiserveravailability.APIServerAvailabilityIntervalsFromCluster(client, time.Time{}, time.Time{}) + lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType) + + recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil) + samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb) if err != nil { - klog.Errorf("error reading initial apiserver availability: %v", err) + return err } - recorder.AddIntervals(intervals...) - return recorder, nil + + go func(ctx context.Context) { + defer cancelFn() + err := o.WaitForStopSignal(ctx) + if err != nil { + fmt.Fprintf(o.ErrOut, "failure waiting for stop: %v", err) + } + }(ctx) + + <-ctx.Done() + + fmt.Fprintf(o.Out, "waiting for samplers to stop") + wg := sync.WaitGroup{} + for i := range samplers { + wg.Add(1) + func(sampler disruptionci.Sampler) { + defer wg.Done() + sampler.Stop() + }(samplers[i]) + } + wg.Wait() + fmt.Fprintf(o.Out, "samplers stopped") + + return nil +} + +func (o *RunAPIDisruptionMonitorOptions) WaitForStopSignal(ctx context.Context) error { + defer utilruntime.HandleCrash() + + _, err := watch.UntilWithSync( + ctx, + cache.NewListWatchFromClient( + o.KubeClient.CoreV1().RESTClient(), "configmaps", o.Namespace, fields.OneTermEqualSelector("metadata.name", o.StopConfigMapName)), + &corev1.ConfigMap{}, + nil, + func(event apimachinerywatch.Event) (bool, error) { + switch event.Type { + case apimachinerywatch.Added: + return true, nil + case apimachinerywatch.Modified: + return true, nil + } + return false, nil + }, + ) + return err } diff --git a/pkg/defaultmonitortests/types.go b/pkg/defaultmonitortests/types.go index ad172476409a..251a5df81eaa 100644 --- a/pkg/defaultmonitortests/types.go +++ b/pkg/defaultmonitortests/types.go @@ -16,7 +16,8 @@ import ( "github.com/openshift/origin/pkg/monitortests/kubeapiserver/apiservergracefulrestart" "github.com/openshift/origin/pkg/monitortests/kubeapiserver/apiunreachablefromclientmetrics" "github.com/openshift/origin/pkg/monitortests/kubeapiserver/auditloganalyzer" - "github.com/openshift/origin/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers" + "github.com/openshift/origin/pkg/monitortests/kubeapiserver/disruptionexternalapiserver" + "github.com/openshift/origin/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver" "github.com/openshift/origin/pkg/monitortests/kubeapiserver/disruptionnewapiserver" "github.com/openshift/origin/pkg/monitortests/kubeapiserver/faultyloadbalancer" "github.com/openshift/origin/pkg/monitortests/kubeapiserver/generationanalyzer" @@ -44,6 +45,7 @@ import ( "github.com/openshift/origin/pkg/monitortests/testframework/disruptionexternalservicemonitoring" "github.com/openshift/origin/pkg/monitortests/testframework/disruptionserializer" "github.com/openshift/origin/pkg/monitortests/testframework/e2etestanalyzer" + "github.com/openshift/origin/pkg/monitortests/testframework/intervalserializer" "github.com/openshift/origin/pkg/monitortests/testframework/knownimagechecker" "github.com/openshift/origin/pkg/monitortests/testframework/legacytestframeworkmonitortests" @@ -111,8 +113,9 @@ func newDefaultMonitorTests(info monitortestframework.MonitorTestInitializationI monitorTestRegistry.AddMonitorTestOrDie("image-registry-availability", "Image Registry", disruptionimageregistry.NewAvailabilityInvariant()) - monitorTestRegistry.AddMonitorTestOrDie("apiserver-availability", "kube-apiserver", disruptionlegacyapiservers.NewAvailabilityInvariant()) - monitorTestRegistry.AddMonitorTestOrDie("apiserver-new-disruption-invariant", "kube-apiserver", disruptionnewapiserver.NewDisruptionInvariant()) + monitorTestRegistry.AddMonitorTestOrDie("apiserver-disruption-invariant", "kube-apiserver", disruptionnewapiserver.NewDisruptionInvariant()) + monitorTestRegistry.AddMonitorTestOrDie("apiserver-external-availability", "kube-apiserver", disruptionexternalapiserver.NewExternalDisruptionInvariant(info)) + monitorTestRegistry.AddMonitorTestOrDie("apiserver-incluster-availability", "kube-apiserver", disruptioninclusterapiserver.NewInvariantInClusterDisruption(info)) monitorTestRegistry.AddMonitorTestOrDie("pod-network-avalibility", "Network / ovn-kubernetes", disruptionpodnetwork.NewPodNetworkAvalibilityInvariant(info)) monitorTestRegistry.AddMonitorTestOrDie("service-type-load-balancer-availability", "Networking / router", disruptionserviceloadbalancer.NewAvailabilityInvariant()) diff --git a/pkg/disruption/backend/disruption/interval.go b/pkg/disruption/backend/disruption/interval.go index 45f5783fbc79..09da211ce2fa 100644 --- a/pkg/disruption/backend/disruption/interval.go +++ b/pkg/disruption/backend/disruption/interval.go @@ -1,6 +1,9 @@ package disruption import ( + "fmt" + "strings" + "github.com/openshift/origin/pkg/disruption/backend" backendsampler "github.com/openshift/origin/pkg/disruption/backend/sampler" "github.com/openshift/origin/pkg/monitor/monitorapi" @@ -109,7 +112,15 @@ func (t *intervalTracker) collect(result backend.SampleResult) { case previous.Succeeded() && current.Succeeded(): return case !previous.Succeeded() && !current.Succeeded(): - if previous.Error() == current.Error() { + previousErrorCensored := previous.Error() + currentErrorCensored := current.Error() + // Censor sample-id from error message + if previous.Sample != nil && current.Sample != nil { + previousErrorCensored = strings.ReplaceAll(previous.Error(), fmt.Sprintf("&sample-id=%d", previous.Sample.ID), "") + currentErrorCensored = strings.ReplaceAll(current.Error(), fmt.Sprintf("&sample-id=%d", current.Sample.ID), "") + } + + if previousErrorCensored == currentErrorCensored { return } // both previous and current failed, but with different errors diff --git a/pkg/disruption/ci/backend_sampler.go b/pkg/disruption/ci/backend_sampler.go index 4ba243002e4a..61875a84c038 100644 --- a/pkg/disruption/ci/backend_sampler.go +++ b/pkg/disruption/ci/backend_sampler.go @@ -37,6 +37,7 @@ type BackendSampler struct { hostNameDecoder backend.HostNameDecoderWithRunner lock sync.Mutex cancel context.CancelFunc + samplerFinished chan struct{} } func (bs *BackendSampler) GetTargetServerName() string { @@ -68,6 +69,8 @@ func (bs *BackendSampler) GetURL() (string, error) { } func (bs *BackendSampler) RunEndpointMonitoring(ctx context.Context, m monitorapi.RecorderWriter, eventRecorder events.EventRecorder) error { + defer close(bs.samplerFinished) + ctx, cancel := context.WithCancel(ctx) bs.lock.Lock() bs.cancel = cancel @@ -135,4 +138,7 @@ func (bs *BackendSampler) Stop() { if cancel != nil { cancel() } + + // wait for the sampler to be done + <-bs.samplerFinished } diff --git a/pkg/disruption/ci/factory.go b/pkg/disruption/ci/factory.go index b0c203271fe9..f6c67b1c6adf 100644 --- a/pkg/disruption/ci/factory.go +++ b/pkg/disruption/ci/factory.go @@ -6,6 +6,8 @@ import ( "sync" "time" + "k8s.io/client-go/kubernetes" + "github.com/openshift/origin/pkg/disruption/backend" "github.com/openshift/origin/pkg/disruption/backend/disruption" "github.com/openshift/origin/pkg/disruption/backend/logger" @@ -34,10 +36,11 @@ type Factory interface { // NewDisruptionTestFactory returns a shared disruption test factory that uses // the given rest Config object to create new disruption test instances. -func NewDisruptionTestFactory(config *rest.Config) Factory { +func NewDisruptionTestFactory(config *rest.Config, kubeClient kubernetes.Interface) Factory { return &testFactory{ dependency: &restConfigDependency{ - config: config, + config: config, + kubeClient: kubeClient, }, } } @@ -138,9 +141,6 @@ type dependency interface { // GetHostNameDecoder returns the appropriate HostNameDecoder instance. GetHostNameDecoder() (backend.HostNameDecoderWithRunner, error) - - // GetRestConfig returns kubeconfig - GetRestConfig() *rest.Config } type testFactory struct { @@ -189,12 +189,14 @@ func (b *testFactory) New(c TestConfiguration) (Sampler, error) { pc := backendsampler.NewSampleProducerConsumer(client, requestor, backendsampler.NewResponseChecker(), collector) runner := sampler.NewWithProducerConsumer(c.SampleInterval, pc) + samplerFinished := make(chan struct{}) backendSampler := &BackendSampler{ TestConfiguration: c, SampleRunner: runner, wantEventRecorderAndMonitor: []backend.WantEventRecorderAndMonitorRecorder{b.wantMonitorAndRecorder, want}, baseURL: requestor.GetBaseURL(), hostNameDecoder: b.hostNameDecoder, + samplerFinished: samplerFinished, } return backendSampler, nil } @@ -202,7 +204,8 @@ func (b *testFactory) New(c TestConfiguration) (Sampler, error) { // restConfigDependency is used by the factory when we want to create // a disruption test instance from a rest Config. type restConfigDependency struct { - config *rest.Config + config *rest.Config + kubeClient kubernetes.Interface } func (r *restConfigDependency) NewTransport(tc TestConfiguration) (http.RoundTripper, error) { @@ -221,10 +224,10 @@ func (r *restConfigDependency) NewTransport(tc TestConfiguration) (http.RoundTri } return rt, nil } -func (r *restConfigDependency) HostName() string { return r.config.Host } -func (r *restConfigDependency) GetHostNameDecoder() (backend.HostNameDecoderWithRunner, error) { - return NewAPIServerIdentityToHostNameDecoder(r.config) +func (r *restConfigDependency) HostName() string { + return r.config.Host } -func (r *restConfigDependency) GetRestConfig() *rest.Config { - return r.config + +func (r *restConfigDependency) GetHostNameDecoder() (backend.HostNameDecoderWithRunner, error) { + return NewAPIServerIdentityToHostNameDecoder(r.kubeClient) } diff --git a/pkg/disruption/ci/host_decoder.go b/pkg/disruption/ci/host_decoder.go index 03381a492bcc..bdcb9e4bdeda 100644 --- a/pkg/disruption/ci/host_decoder.go +++ b/pkg/disruption/ci/host_decoder.go @@ -12,21 +12,16 @@ import ( "golang.org/x/net/context" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/kubernetes" coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" - "k8s.io/client-go/rest" "k8s.io/kubernetes/test/e2e/framework" ) // NewAPIServerIdentityToHostNameDecoder returns a new // HostNameDecoder instance that is capable of decoding the // APIServerIdentity into the human readable hostname. -func NewAPIServerIdentityToHostNameDecoder(config *rest.Config) (*apiServerIdentityDecoder, error) { - clients, err := clientset.NewForConfig(config) - if err != nil { - return nil, err - } - client := clients.CoordinationV1().Leases(metav1.NamespaceSystem) +func NewAPIServerIdentityToHostNameDecoder(kubeClient kubernetes.Interface) (*apiServerIdentityDecoder, error) { + client := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem) return &apiServerIdentityDecoder{client: client}, nil } diff --git a/pkg/monitor/monitorapi/disruption.go b/pkg/monitor/monitorapi/disruption.go index 6320c089c0de..619dd5450e66 100644 --- a/pkg/monitor/monitorapi/disruption.go +++ b/pkg/monitor/monitorapi/disruption.go @@ -23,3 +23,7 @@ func BackendDisruptionSeconds(backendDisruptionName string, events Intervals) (t func IsDisruptionEvent(eventInterval Interval) bool { return eventInterval.Source == SourceDisruption } + +func HasRealLoadBalancer(eventInterval Interval) bool { + return eventInterval.Locator.Keys[LocatorLoadBalancerKey] != "localhost" +} diff --git a/pkg/monitortestframework/types.go b/pkg/monitortestframework/types.go index 4443fb01b900..959c706fed1e 100644 --- a/pkg/monitortestframework/types.go +++ b/pkg/monitortestframework/types.go @@ -36,6 +36,8 @@ type MonitorTestInitializationInfo struct { DisableMonitorTests []string } +type OpenshiftTestImageGetterFunc func(ctx context.Context, adminRESTConfig *rest.Config) (imagePullSpec string, notSupportedReason string, err error) + type MonitorTest interface { // StartCollection is responsible for setting up all resources required for collection of data on the cluster. // An error will not stop execution, but will cause a junit failure that will cause the job run to fail. diff --git a/pkg/monitortestlibrary/disruptionlibrary/collect.go b/pkg/monitortestlibrary/disruptionlibrary/collect.go new file mode 100644 index 000000000000..4d5aaa494fa7 --- /dev/null +++ b/pkg/monitortestlibrary/disruptionlibrary/collect.go @@ -0,0 +1,86 @@ +package disruptionlibrary + +import ( + "bufio" + "bytes" + "context" + "fmt" + "strings" + + "github.com/openshift/origin/pkg/monitor/monitorapi" + monitorserialization "github.com/openshift/origin/pkg/monitor/serialization" + "github.com/openshift/origin/pkg/test/ginkgo/junitapi" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" +) + +func CollectIntervalsForPods(ctx context.Context, kubeClient kubernetes.Interface, sig string, namespace string, labelSelector labels.Selector) (monitorapi.Intervals, []*junitapi.JUnitTestCase, []error) { + pollerPods, err := kubeClient.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector.String(), + }) + if err != nil { + return nil, nil, []error{err} + } + + retIntervals := monitorapi.Intervals{} + junits := []*junitapi.JUnitTestCase{} + errs := []error{} + buf := &bytes.Buffer{} + podsWithoutIntervals := []string{} + for _, pollerPod := range pollerPods.Items { + fmt.Fprintf(buf, "\n\nLogs for -n %v pod/%v\n", pollerPod.Namespace, pollerPod.Name) + req := kubeClient.CoreV1().Pods(namespace).GetLogs(pollerPod.Name, &corev1.PodLogOptions{}) + if err != nil { + errs = append(errs, err) + continue + } + logStream, err := req.Stream(ctx) + if err != nil { + errs = append(errs, err) + continue + } + + foundInterval := false + scanner := bufio.NewScanner(logStream) + for scanner.Scan() { + line := scanner.Bytes() + buf.Write(line) + buf.Write([]byte("\n")) + if len(line) == 0 { + continue + } + + // not all lines are json, ignore errors. + if currInterval, err := monitorserialization.IntervalFromJSON(line); err == nil { + retIntervals = append(retIntervals, *currInterval) + foundInterval = true + } + } + if !foundInterval { + podsWithoutIntervals = append(podsWithoutIntervals, pollerPod.Name) + } + } + + failures := []string{} + if len(podsWithoutIntervals) > 0 { + failures = append(failures, fmt.Sprintf("%d pods lacked sampler output: [%v]", len(podsWithoutIntervals), strings.Join(podsWithoutIntervals, ", "))) + } + if len(pollerPods.Items) == 0 { + failures = append(failures, fmt.Sprintf("no pods found for poller %v", labelSelector)) + } + + logJunit := &junitapi.JUnitTestCase{ + Name: fmt.Sprintf("[%s] can collect %v poller pod logs", sig, labelSelector), + SystemOut: string(buf.Bytes()), + } + if len(failures) > 0 { + logJunit.FailureOutput = &junitapi.FailureOutput{ + Output: strings.Join(failures, "\n"), + } + } + junits = append(junits, logJunit) + + return retIntervals, junits, errs +} diff --git a/pkg/monitortestlibrary/disruptionlibrary/deployment.go b/pkg/monitortestlibrary/disruptionlibrary/deployment.go new file mode 100644 index 000000000000..b27ba30ce817 --- /dev/null +++ b/pkg/monitortestlibrary/disruptionlibrary/deployment.go @@ -0,0 +1,17 @@ +package disruptionlibrary + +import ( + appsv1 "k8s.io/api/apps/v1" +) + +func UpdateDeploymentENVs(deployment *appsv1.Deployment, deploymentID, serviceClusterIP string) *appsv1.Deployment { + for i, env := range deployment.Spec.Template.Spec.Containers[0].Env { + if env.Name == "DEPLOYMENT_ID" { + deployment.Spec.Template.Spec.Containers[0].Env[i].Value = deploymentID + } else if env.Name == "SERVICE_CLUSTER_IP" && len(serviceClusterIP) > 0 { + deployment.Spec.Template.Spec.Containers[0].Env[i].Value = serviceClusterIP + } + } + + return deployment +} diff --git a/pkg/monitortests/kubeapiserver/disruptionexternalapiserver/monitortest.go b/pkg/monitortests/kubeapiserver/disruptionexternalapiserver/monitortest.go new file mode 100644 index 000000000000..a2d6dfd630e4 --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptionexternalapiserver/monitortest.go @@ -0,0 +1,262 @@ +package disruptionexternalapiserver + +import ( + "context" + "fmt" + "time" + + "github.com/openshift/origin/pkg/monitor/backenddisruption" + "github.com/openshift/origin/pkg/monitor/monitorapi" + "github.com/openshift/origin/pkg/monitortestframework" + "github.com/openshift/origin/pkg/monitortestlibrary/disruptionlibrary" + "github.com/openshift/origin/pkg/test/ginkgo/junitapi" + + exutil "github.com/openshift/origin/test/extended/util" + "github.com/sirupsen/logrus" + + imagev1 "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1" + oauthv1 "github.com/openshift/client-go/oauth/clientset/versioned/typed/oauth/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" +) + +type InvariantExternalDisruption struct { + notSupportedReason error + disruptionCheckers []*disruptionlibrary.Availability + + adminRESTConfig *rest.Config + kubeClient kubernetes.Interface +} + +func NewExternalDisruptionInvariant(info monitortestframework.MonitorTestInitializationInfo) monitortestframework.MonitorTest { + return &InvariantExternalDisruption{} +} + +func createBackendSampler(clusterConfig *rest.Config, disruptionBackendName, url string, connectionType monitorapi.BackendConnectionType) (*backenddisruption.BackendSampler, error) { + backendSampler, err := backenddisruption.NewAPIServerBackend(clusterConfig, disruptionBackendName, url, connectionType) + if err != nil { + return nil, err + } + backendSampler = backendSampler.WithUserAgent(fmt.Sprintf("openshift-external-backend-sampler-%s-%s", connectionType, disruptionBackendName)) + return backendSampler, nil +} + +func testNames(disruptionBackendName, apiserver string) (string, string) { + return fmt.Sprintf("[sig-api-machinery] disruption/%s apiserver/%s connection/%s should be available throughout the test", disruptionBackendName, apiserver, "new"), + fmt.Sprintf("[sig-api-machinery] disruption/%s apiserver/%s connection/%s should be available throughout the test", disruptionBackendName, apiserver, "reused") +} + +func createApiServerChecker(adminRESTConfig *rest.Config, disruptionBackendName, apiserver, url string) (*disruptionlibrary.Availability, error) { + newConnectionTestName, reusedConnectionTestName := testNames(disruptionBackendName, apiserver) + + newConnections, err := createBackendSampler(adminRESTConfig, disruptionBackendName, url, monitorapi.NewConnectionType) + if err != nil { + return nil, err + } + reusedConnections, err := createBackendSampler(adminRESTConfig, disruptionBackendName, url, monitorapi.ReusedConnectionType) + if err != nil { + return nil, err + } + return disruptionlibrary.NewAvailabilityInvariant( + newConnectionTestName, reusedConnectionTestName, + newConnections, reusedConnections, + ), nil +} + +func createKubeApiChecker(adminRESTConfig *rest.Config, url string, cache bool) (*disruptionlibrary.Availability, error) { + disruptionBackendName := "kube-api" + if cache { + disruptionBackendName = fmt.Sprintf("cache-%s", disruptionBackendName) + } + return createApiServerChecker(adminRESTConfig, disruptionBackendName, "kube-apiserver", url) +} + +func createOpenshiftApiChecker(adminRESTConfig *rest.Config, url string, cache bool) (*disruptionlibrary.Availability, error) { + disruptionBackendName := "openshift-api" + if cache { + disruptionBackendName = fmt.Sprintf("cache-%s", disruptionBackendName) + } + return createApiServerChecker(adminRESTConfig, disruptionBackendName, "openshift-apiserver", url) +} + +func createOauthApiChecker(adminRESTConfig *rest.Config, url string, cache bool) (*disruptionlibrary.Availability, error) { + disruptionBackendName := "oauth-api" + if cache { + disruptionBackendName = fmt.Sprintf("cache-%s", disruptionBackendName) + } + return createApiServerChecker(adminRESTConfig, disruptionBackendName, "oauth-apiserver", url) +} + +func (i *InvariantExternalDisruption) StartCollection(ctx context.Context, adminRESTConfig *rest.Config, recorder monitorapi.RecorderWriter) error { + var err error + + log := logrus.WithField("monitorTest", "apiserver-external-availability").WithField("func", "StartCollection") + log.Infof("starting external API monitors") + + i.adminRESTConfig = adminRESTConfig + + i.kubeClient, err = kubernetes.NewForConfig(i.adminRESTConfig) + if err != nil { + return err + } + isMicroShift, err := exutil.IsMicroShiftCluster(i.kubeClient) + if err != nil { + return fmt.Errorf("unable to determine if cluster is MicroShift: %v", err) + } + if isMicroShift { + i.notSupportedReason = &monitortestframework.NotSupportedError{ + Reason: "platform MicroShift not supported", + } + } + if i.notSupportedReason != nil { + return i.notSupportedReason + } + + namespaces, err := i.kubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("unable to list namespaces for cached kube api checker: %v", err) + } + namespacesListRevision := namespaces.ResourceVersion + + imageClient, err := imagev1.NewForConfig(i.adminRESTConfig) + if err != nil { + return fmt.Errorf("unable to create imagestream client for openshift-apiserver api checker: %v", err) + } + + imageStreamNS := "openshift" + imagestreams, err := imageClient.ImageStreams(imageStreamNS).List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("unable to list imagestreams for openshift-apiserver api checker: %v", err) + } + if len(imagestreams.Items) == 0 { + return fmt.Errorf("found no suitable imagestream for openshift-apiserver api checker: %v", imagestreams) + } + imageStreamName := imagestreams.Items[0].Name + imageStreamRevision := imagestreams.Items[0].ResourceVersion + + oauthClient, err := oauthv1.NewForConfig(i.adminRESTConfig) + if err != nil { + return fmt.Errorf("unable to create oauth client for oauth-apiserver api checker: %v", err) + } + oauthclients, err := oauthClient.OAuthClients().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("unable to list oauth for oauth-apiserver api checker: %v", err) + } + if len(oauthclients.Items) == 0 { + return fmt.Errorf("found no suitable imagestream for oauth-apiserver api checker: %v", err) + } + oauthClientName := oauthclients.Items[0].Name + oauthClientRevision := oauthclients.Items[0].ResourceVersion + + kubeApiChecker, err := createKubeApiChecker(i.adminRESTConfig, "/api/v1/namespaces/default", false) + if err != nil { + return fmt.Errorf("unable to create kube api checker: %v", err) + } + i.disruptionCheckers = append(i.disruptionCheckers, kubeApiChecker) + + kubeApiCachedChecker, err := createKubeApiChecker(i.adminRESTConfig, fmt.Sprintf("/api/v1/namespaces/default?resourceVersion=%s", namespacesListRevision), true) + if err != nil { + return fmt.Errorf("unable to create cached kube api checker: %v", err) + } + i.disruptionCheckers = append(i.disruptionCheckers, kubeApiCachedChecker) + + openshiftApiChecker, err := createOpenshiftApiChecker(i.adminRESTConfig, fmt.Sprintf("/apis/image.openshift.io/v1/namespaces/%s/imagestreams", imageStreamNS), false) + if err != nil { + return fmt.Errorf("unable to create openshift api checker: %v", err) + } + i.disruptionCheckers = append(i.disruptionCheckers, openshiftApiChecker) + + openshiftApiCachedChecker, err := createOpenshiftApiChecker(i.adminRESTConfig, fmt.Sprintf("/apis/image.openshift.io/v1/namespaces/%s/imagestreams/%s?resourceVersion=%s", imageStreamNS, imageStreamName, imageStreamRevision), true) + if err != nil { + return fmt.Errorf("unable to create cached openshift api checker: %v", err) + } + i.disruptionCheckers = append(i.disruptionCheckers, openshiftApiCachedChecker) + + oauthApiChecker, err := createOauthApiChecker(i.adminRESTConfig, "/apis/oauth.openshift.io/v1/oauthclients", false) + if err != nil { + return fmt.Errorf("unable to create oauth api checker: %v", err) + } + + i.disruptionCheckers = append(i.disruptionCheckers, oauthApiChecker) + + oauthApiCachedChecker, err := createOauthApiChecker(i.adminRESTConfig, fmt.Sprintf("/apis/oauth.openshift.io/v1/oauthclients/%s?resourceVersion=%s", oauthClientName, oauthClientRevision), true) + if err != nil { + return fmt.Errorf("unable to create cached openshift api checker: %v", err) + } + i.disruptionCheckers = append(i.disruptionCheckers, oauthApiCachedChecker) + + for n := range i.disruptionCheckers { + if err := i.disruptionCheckers[n].StartCollection(ctx, adminRESTConfig, recorder); err != nil { + return err + } + } + + return nil +} + +func (i *InvariantExternalDisruption) CollectData(ctx context.Context, storageDir string, beginning time.Time, end time.Time) (monitorapi.Intervals, []*junitapi.JUnitTestCase, error) { + log := logrus.WithField("monitorTest", "apiserver-external-availability").WithField("func", "CollectData") + log.Infof("collecting intervals") + if i.notSupportedReason != nil { + return nil, nil, i.notSupportedReason + } + // we failed and indicated it during setup. + if i.disruptionCheckers == nil { + return nil, nil, nil + } + + intervals := monitorapi.Intervals{} + junits := []*junitapi.JUnitTestCase{} + errs := []error{} + + for n := range i.disruptionCheckers { + // we failed and indicated it during setup. + if i.disruptionCheckers[n] == nil { + continue + } + + localIntervals, localJunits, localErr := i.disruptionCheckers[n].CollectData(ctx) + intervals = append(intervals, localIntervals...) + junits = append(junits, localJunits...) + if localErr != nil { + errs = append(errs, localErr) + } + } + + return intervals, junits, utilerrors.NewAggregate(errs) +} + +func (i *InvariantExternalDisruption) ConstructComputedIntervals(ctx context.Context, startingIntervals monitorapi.Intervals, _ monitorapi.ResourcesMap, beginning time.Time, end time.Time) (constructedIntervals monitorapi.Intervals, err error) { + return nil, nil +} + +func (i *InvariantExternalDisruption) EvaluateTestsFromConstructedIntervals(ctx context.Context, finalIntervals monitorapi.Intervals) ([]*junitapi.JUnitTestCase, error) { + + junits := []*junitapi.JUnitTestCase{} + errs := []error{} + + for n := range i.disruptionCheckers { + // we failed and indicated it during setup. + if i.disruptionCheckers[n] == nil { + continue + } + + localJunits, localErr := i.disruptionCheckers[n].EvaluateTestsFromConstructedIntervals(ctx, finalIntervals) + junits = append(junits, localJunits...) + if localErr != nil { + errs = append(errs, localErr) + } + } + + return junits, utilerrors.NewAggregate(errs) +} + +func (i *InvariantExternalDisruption) WriteContentToStorage(ctx context.Context, storageDir, timeSuffix string, finalIntervals monitorapi.Intervals, finalResourceState monitorapi.ResourcesMap) error { + return nil +} + +func (i *InvariantExternalDisruption) Cleanup(ctx context.Context) error { + return nil +} diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/clusterrole-monitor.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/clusterrole-monitor.yaml new file mode 100644 index 000000000000..130b596f5e3e --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/clusterrole-monitor.yaml @@ -0,0 +1,46 @@ +kind: ClusterRole +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: e2e-disruption-monitor +rules: + - verbs: + - list + apiGroups: + - '' + - oauth.openshift.io + resources: + - oauthclients + - verbs: + - get + - list + apiGroups: + - '' + resources: + - namespaces + - verbs: + - create + - get + - list + - update + - delete + - watch + apiGroups: + - '' + resources: + - configmaps + - verbs: + - list + apiGroups: + - image.openshift.io + resources: + - imagestreams + - verbs: + - create + - get + - list + - update + - delete + apiGroups: + - coordination.k8s.io + resources: + - leases diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/crb-monitor.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/crb-monitor.yaml new file mode 100644 index 000000000000..43e0c6f5ea5c --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/crb-monitor.yaml @@ -0,0 +1,11 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + generateName: e2e-disruption-monitor- +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: e2e-disruption-monitor +subjects: +- kind: ServiceAccount + name: disruption-monitor-sa diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/crb-privileged.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/crb-privileged.yaml new file mode 100644 index 000000000000..4ae30320d328 --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/crb-privileged.yaml @@ -0,0 +1,11 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + generateName: e2e-disruption-monitor-privileged- +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: system:openshift:scc:privileged +subjects: +- kind: ServiceAccount + name: disruption-monitor-sa diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml new file mode 100644 index 000000000000..d9a62bbbf49e --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-internal-lb.yaml @@ -0,0 +1,63 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: internal-lb-monitor +spec: + replicas: 1 + selector: + matchLabels: + app: internal-lb-monitor + apiserver.openshift.io/disruption-actor: poller + template: + metadata: + labels: + app: internal-lb-monitor + apiserver.openshift.io/disruption-actor: poller + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - internal-lb-monitor + topologyKey: "kubernetes.io/hostname" + containers: + - name: internal-lb + command: + - openshift-tests + - run-disruption + - --output-file=/var/log/disruption-data/monitor-events/internal-lb-$(DEPLOYMENT_ID).txt + - --lb-type=$(LOAD_BALANCER) + - --stop-configmap=stop-configmap + env: + - name: KUBERNETES_SERVICE_HOST + value: api-int.foo.bar + - name: KUBERNETES_SERVICE_PORT + value: "6443" + - name: LOAD_BALANCER + value: "internal-lb" + - name: DEPLOYMENT_ID + #to be overwritten at deployment initialization time + value: "DEFAULT" + image: to-be-replaced + volumeMounts: + - mountPath: /var/log/disruption-data + name: artifacts + securityContext: + privileged: true + runAsUser: 0 + hostNetwork: true + serviceAccountName: disruption-monitor-sa + volumes: + - hostPath: + path: /var/log/disruption-data + type: DirectoryOrCreate + name: artifacts + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + - key: node-role.kubernetes.io/edge + effect: NoSchedule diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml new file mode 100644 index 000000000000..b6f8b778fa5f --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-localhost.yaml @@ -0,0 +1,67 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: localhost-monitor +spec: + replicas: 1 + selector: + matchLabels: + app: localhost-monitor + apiserver.openshift.io/disruption-actor: poller + template: + metadata: + labels: + app: localhost-monitor + apiserver.openshift.io/disruption-actor: poller + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - localhost-monitor + topologyKey: "kubernetes.io/hostname" + containers: + - name: localhost + command: + - openshift-tests + - run-disruption + - --output-file=/var/log/disruption-data/monitor-events/localhost-monitor-$(DEPLOYMENT_ID).txt + - --lb-type=$(LOAD_BALANCER) + - --stop-configmap=stop-configmap + env: + - name: KUBECONFIG + value: "/kubeconfigs/localhost.kubeconfig" + - name: LOAD_BALANCER + value: localhost + - name: DEPLOYMENT_ID + #to be overwritten at deployment initialization time + value: "DEFAULT" + image: to-be-replaced + volumeMounts: + - mountPath: /var/log/disruption-data + name: artifacts + - mountPath: /kubeconfigs + name: node-kubeconfigs + securityContext: + privileged: true + runAsUser: 0 + nodeSelector: + node-role.kubernetes.io/control-plane: "" + hostNetwork: true + serviceAccountName: disruption-monitor-sa + volumes: + - hostPath: + path: /var/log/disruption-data + type: DirectoryOrCreate + name: artifacts + - hostPath: + path: /etc/kubernetes/static-pod-resources/kube-apiserver-certs/secrets/node-kubeconfigs + type: Directory + name: node-kubeconfigs + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml new file mode 100644 index 000000000000..7cead915de01 --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/dep-service-network.yaml @@ -0,0 +1,58 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: service-network-monitor +spec: + replicas: 1 + selector: + matchLabels: + app: service-network-monitor + apiserver.openshift.io/disruption-actor: poller + template: + metadata: + labels: + app: service-network-monitor + apiserver.openshift.io/disruption-actor: poller + spec: + affinity: + podAntiAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + - labelSelector: + matchExpressions: + - key: app + operator: In + values: + - service-network-monitor + topologyKey: "kubernetes.io/hostname" + containers: + - name: service-network + command: + - openshift-tests + - run-disruption + - --output-file=/var/log/disruption-data/monitor-events/service-network-monitor-$(DEPLOYMENT_ID).txt + - --lb-type=$(LOAD_BALANCER) + - --stop-configmap=stop-configmap + env: + - name: LOAD_BALANCER + value: service-network + - name: DEPLOYMENT_ID + #to be overwritten at deployment initialization time + value: "DEFAULT" + image: to-be-replaced + volumeMounts: + - mountPath: /var/log/disruption-data + name: artifacts + securityContext: + privileged: true + runAsUser: 0 + serviceAccountName: disruption-monitor-sa + volumes: + - hostPath: + path: /var/log/disruption-data + type: DirectoryOrCreate + name: artifacts + tolerations: + - key: node-role.kubernetes.io/master + effect: NoSchedule + - key: node-role.kubernetes.io/edge + effect: NoSchedule diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/namespace.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/namespace.yaml new file mode 100644 index 000000000000..813251cf8ee2 --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/namespace.yaml @@ -0,0 +1,11 @@ +kind: Namespace +apiVersion: v1 +metadata: + generateName: e2e-disruption-monitor- + labels: + pod-security.kubernetes.io/enforce: privileged + pod-security.kubernetes.io/audit: privileged + pod-security.kubernetes.io/warn: privileged + apiserver.openshift.io/incluster-disruption: "true" + annotations: + workload.openshift.io/allowed: management diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/rb-monitor.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/rb-monitor.yaml new file mode 100644 index 000000000000..e40e6ca2657c --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/rb-monitor.yaml @@ -0,0 +1,11 @@ +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + generateName: e2e-disruption-monitor +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: e2e-disruption-monitor +subjects: +- kind: ServiceAccount + name: disruption-monitor-sa diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/role-monitor.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/role-monitor.yaml new file mode 100644 index 000000000000..861427899968 --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/role-monitor.yaml @@ -0,0 +1,16 @@ +kind: Role +apiVersion: rbac.authorization.k8s.io/v1 +metadata: + name: e2e-disruption-monitor +rules: + - verbs: + - create + - get + - list + - update + - delete + - watch + apiGroups: + - '' + resources: + - configmaps diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/serviceaccount.yaml b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/serviceaccount.yaml new file mode 100644 index 000000000000..528e6fd9556e --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/manifests/serviceaccount.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: disruption-monitor-sa diff --git a/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/monitortest.go b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/monitortest.go new file mode 100644 index 000000000000..b0519f7abb59 --- /dev/null +++ b/pkg/monitortests/kubeapiserver/disruptioninclusterapiserver/monitortest.go @@ -0,0 +1,490 @@ +package disruptioninclusterapiserver + +import ( + "bytes" + "context" + _ "embed" + "fmt" + "net/url" + "os/exec" + "strings" + "time" + + "github.com/google/uuid" + "github.com/sirupsen/logrus" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + + "github.com/openshift/origin/pkg/monitortestlibrary/disruptionlibrary" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + + corev1 "k8s.io/api/core/v1" + + exutil "github.com/openshift/origin/test/extended/util" + + configclient "github.com/openshift/client-go/config/clientset/versioned" + "github.com/openshift/library-go/pkg/operator/resource/resourceread" + "github.com/openshift/origin/pkg/monitor/monitorapi" + "github.com/openshift/origin/pkg/monitortestframework" + "github.com/openshift/origin/pkg/test/ginkgo/junitapi" + appsv1 "k8s.io/api/apps/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + watchtools "k8s.io/client-go/tools/watch" +) + +var ( + //go:embed manifests/namespace.yaml + namespaceYaml []byte + //go:embed manifests/crb-privileged.yaml + rbacPrivilegedYaml []byte + //go:embed manifests/clusterrole-monitor.yaml + rbacMonitorClusterRoleYaml []byte + //go:embed manifests/role-monitor.yaml + rbacMonitorRoleYaml []byte + //go:embed manifests/rb-monitor.yaml + rbacMonitorRBYaml []byte + //go:embed manifests/crb-monitor.yaml + rbacListOauthClientCRBYaml []byte + //go:embed manifests/serviceaccount.yaml + serviceAccountYaml []byte + //go:embed manifests/dep-internal-lb.yaml + internalLBDeploymentYaml []byte + //go:embed manifests/dep-service-network.yaml + serviceNetworkDeploymentYaml []byte + //go:embed manifests/dep-localhost.yaml + localhostDeploymentYaml []byte + rbacPrivilegedCRBName string + rbacMonitorClusterRoleName string + rbacMonitorCRBName string +) + +type InvariantInClusterDisruption struct { + namespaceName string + openshiftTestsImagePullSpec string + payloadImagePullSpec string + notSupportedReason string + allNodes int32 + controlPlaneNodes int32 + + adminRESTConfig *rest.Config + kubeClient kubernetes.Interface +} + +func NewInvariantInClusterDisruption(info monitortestframework.MonitorTestInitializationInfo) monitortestframework.MonitorTest { + return &InvariantInClusterDisruption{ + payloadImagePullSpec: info.UpgradeTargetPayloadImagePullSpec, + } +} + +func (i *InvariantInClusterDisruption) createDeploymentAndWaitToRollout(ctx context.Context, deploymentObj *appsv1.Deployment) error { + deploymentID := uuid.New().String() + deploymentObj = disruptionlibrary.UpdateDeploymentENVs(deploymentObj, deploymentID, "") + + client := i.kubeClient.AppsV1().Deployments(deploymentObj.Namespace) + _, err := client.Create(ctx, deploymentObj, metav1.CreateOptions{}) + if err != nil { + return fmt.Errorf("error creating deployment %s: %v", deploymentObj.Namespace, err) + } + + timeLimitedCtx, cancel := context.WithTimeout(ctx, 5*time.Minute) + defer cancel() + + if _, watchErr := watchtools.UntilWithSync(timeLimitedCtx, + cache.NewListWatchFromClient( + i.kubeClient.AppsV1().RESTClient(), "deployments", deploymentObj.Namespace, fields.OneTermEqualSelector("metadata.name", deploymentObj.Name)), + &appsv1.Deployment{}, + nil, + func(event watch.Event) (bool, error) { + deployment := event.Object.(*appsv1.Deployment) + return deployment.Status.AvailableReplicas == deployment.Status.Replicas, nil + }, + ); watchErr != nil { + return fmt.Errorf("deployment %s didn't roll out: %v", deploymentObj.Name, watchErr) + } + return nil +} + +func (i *InvariantInClusterDisruption) createInternalLBDeployment(ctx context.Context, apiIntHost string) error { + deploymentObj := resourceread.ReadDeploymentV1OrDie(internalLBDeploymentYaml) + deploymentObj.SetNamespace(i.namespaceName) + deploymentObj.Spec.Template.Spec.Containers[0].Env[0].Value = apiIntHost + // set amount of deployment replicas to make sure it runs on all nodes + deploymentObj.Spec.Replicas = &i.allNodes + // we need to use the openshift-tests image of the destination during an upgrade. + deploymentObj.Spec.Template.Spec.Containers[0].Image = i.openshiftTestsImagePullSpec + + return i.createDeploymentAndWaitToRollout(ctx, deploymentObj) +} + +func (i *InvariantInClusterDisruption) createServiceNetworkDeployment(ctx context.Context) error { + deploymentObj := resourceread.ReadDeploymentV1OrDie(serviceNetworkDeploymentYaml) + deploymentObj.SetNamespace(i.namespaceName) + // set amount of deployment replicas to make sure it runs on all nodes + deploymentObj.Spec.Replicas = &i.allNodes + // we need to use the openshift-tests image of the destination during an upgrade. + deploymentObj.Spec.Template.Spec.Containers[0].Image = i.openshiftTestsImagePullSpec + + return i.createDeploymentAndWaitToRollout(ctx, deploymentObj) +} + +func (i *InvariantInClusterDisruption) createLocalhostDeployment(ctx context.Context) error { + // Don't start localhost deployment on hypershift + if i.controlPlaneNodes == 0 { + return nil + } + + deploymentObj := resourceread.ReadDeploymentV1OrDie(localhostDeploymentYaml) + deploymentObj.SetNamespace(i.namespaceName) + // set amount of deployment replicas to make sure it runs on control plane nodes + deploymentObj.Spec.Replicas = &i.controlPlaneNodes + // we need to use the openshift-tests image of the destination during an upgrade. + deploymentObj.Spec.Template.Spec.Containers[0].Image = i.openshiftTestsImagePullSpec + + return i.createDeploymentAndWaitToRollout(ctx, deploymentObj) +} + +func (i *InvariantInClusterDisruption) createRBACPrivileged(ctx context.Context) error { + rbacPrivilegedObj := resourceread.ReadClusterRoleBindingV1OrDie(rbacPrivilegedYaml) + rbacPrivilegedObj.Subjects[0].Namespace = i.namespaceName + + client := i.kubeClient.RbacV1().ClusterRoleBindings() + obj, err := client.Create(ctx, rbacPrivilegedObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating privileged SCC CRB: %v", err) + } + rbacPrivilegedCRBName = obj.Name + return nil +} + +func (i *InvariantInClusterDisruption) createMonitorClusterRole(ctx context.Context) error { + rbacMonitorRoleObj := resourceread.ReadClusterRoleV1OrDie(rbacMonitorClusterRoleYaml) + + client := i.kubeClient.RbacV1().ClusterRoles() + _, err := client.Create(ctx, rbacMonitorRoleObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating oauthclients list role: %v", err) + } + rbacMonitorClusterRoleName = rbacMonitorRoleObj.Name + return nil +} + +func (i *InvariantInClusterDisruption) createMonitorRole(ctx context.Context) error { + rbacMonitorRoleObj := resourceread.ReadRoleV1OrDie(rbacMonitorRoleYaml) + rbacMonitorRoleObj.Namespace = i.namespaceName + + client := i.kubeClient.RbacV1().Roles(i.namespaceName) + _, err := client.Create(ctx, rbacMonitorRoleObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating oauthclients list role: %v", err) + } + return nil +} + +func (i *InvariantInClusterDisruption) createMonitorCRB(ctx context.Context) error { + rbacMonitorCRBObj := resourceread.ReadClusterRoleBindingV1OrDie(rbacListOauthClientCRBYaml) + rbacMonitorCRBObj.Subjects[0].Namespace = i.namespaceName + + client := i.kubeClient.RbacV1().ClusterRoleBindings() + obj, err := client.Create(ctx, rbacMonitorCRBObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating oauthclients list CRB: %v", err) + } + rbacMonitorCRBName = obj.Name + return nil +} + +func (i *InvariantInClusterDisruption) createMonitorRB(ctx context.Context) error { + rbacMonitorCRBObj := resourceread.ReadRoleBindingV1OrDie(rbacMonitorRBYaml) + rbacMonitorCRBObj.Subjects[0].Namespace = i.namespaceName + + client := i.kubeClient.RbacV1().RoleBindings(i.namespaceName) + obj, err := client.Create(ctx, rbacMonitorCRBObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating monitor RB: %v", err) + } + rbacMonitorCRBName = obj.Name + return nil +} + +func (i *InvariantInClusterDisruption) createServiceAccount(ctx context.Context) error { + serviceAccountObj := resourceread.ReadServiceAccountV1OrDie(serviceAccountYaml) + serviceAccountObj.SetNamespace(i.namespaceName) + + client := i.kubeClient.CoreV1().ServiceAccounts(i.namespaceName) + _, err := client.Create(ctx, serviceAccountObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return fmt.Errorf("error creating service account: %v", err) + } + return nil +} + +func (i *InvariantInClusterDisruption) createNamespace(ctx context.Context) (string, error) { + log := logrus.WithField("monitorTest", "apiserver-incluster-availability").WithField("namespace", i.namespaceName).WithField("func", "createNamespace") + + namespaceObj := resourceread.ReadNamespaceV1OrDie(namespaceYaml) + + client := i.kubeClient.CoreV1().Namespaces() + actualNamespace, err := client.Create(ctx, namespaceObj, metav1.CreateOptions{}) + if err != nil && !apierrors.IsAlreadyExists(err) { + return "", fmt.Errorf("error creating namespace: %v", err) + } + log.Infof("created namespace %s", actualNamespace.Name) + return actualNamespace.Name, nil +} + +func (i *InvariantInClusterDisruption) removeExistingMonitorNamespaces(ctx context.Context) error { + log := logrus.WithField("monitorTest", "apiserver-incluster-availability").WithField("namespace", i.namespaceName).WithField("func", "namespaceAlreadyCreated") + namespaces, err := i.kubeClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{ + LabelSelector: labels.Set{"apiserver.openshift.io/incluster-disruption": "true"}.AsSelector().String(), + }) + if err != nil { + log.Infof("error: %v", err) + return err + } + for _, ns := range namespaces.Items { + if err := i.deleteNamespace(ctx, ns.Name); err != nil { + return err + } + } + return nil +} + +func (i *InvariantInClusterDisruption) deleteNamespace(ctx context.Context, name string) error { + log := logrus.WithField("monitorTest", "apiserver-incluster-availability").WithField("namespace", name).WithField("func", "deleteNamespace") + log.Infof("removing monitoring namespace") + nsClient := i.kubeClient.CoreV1().Namespaces() + err := nsClient.Delete(ctx, name, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("error removing namespace %s: %v", name, err) + } + if !apierrors.IsNotFound(err) { + log.Infof("Namespace %s removed", name) + } + return nil +} + +func (i *InvariantInClusterDisruption) StartCollection(ctx context.Context, adminRESTConfig *rest.Config, _ monitorapi.RecorderWriter) error { + var err error + log := logrus.WithField("monitorTest", "apiserver-incluster-availability").WithField("namespace", i.namespaceName).WithField("func", "StartCollection") + log.Infof("payload image pull spec is %v", i.payloadImagePullSpec) + if len(i.payloadImagePullSpec) == 0 { + configClient, err := configclient.NewForConfig(adminRESTConfig) + if err != nil { + return err + } + clusterVersion, err := configClient.ConfigV1().ClusterVersions().Get(ctx, "version", metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + i.notSupportedReason = "clusterversion/version not found and no image pull spec specified." + return nil + } + if err != nil { + return err + } + i.payloadImagePullSpec = clusterVersion.Status.History[0].Image + } + + // runImageExtract extracts src from specified image to dst + cmd := exec.Command("oc", "adm", "release", "info", i.payloadImagePullSpec, "--image-for=tests") + out := &bytes.Buffer{} + errOut := &bytes.Buffer{} + cmd.Stdout = out + cmd.Stderr = errOut + if err := cmd.Run(); err != nil { + i.notSupportedReason = fmt.Sprintf("unable to determine openshift-tests image: %v: %v", err, errOut.String()) + return nil + } + i.openshiftTestsImagePullSpec = strings.TrimSpace(out.String()) + log.Infof("openshift-tests image pull spec is %v", i.openshiftTestsImagePullSpec) + + i.adminRESTConfig = adminRESTConfig + i.kubeClient, err = kubernetes.NewForConfig(i.adminRESTConfig) + if err != nil { + return fmt.Errorf("error constructing kube client: %v", err) + } + + if ok, _ := exutil.IsMicroShiftCluster(i.kubeClient); ok { + i.notSupportedReason = "microshift clusters don't have load balancers" + log.Infof("IsMicroShiftCluster: %s", i.notSupportedReason) + return nil + } + + // Replace namespace from earlier test + if err := i.removeExistingMonitorNamespaces(ctx); err != nil { + log.Infof("removeExistingMonitorNamespaces returned error %v", err) + return err + } + + log.Infof("starting monitoring deployments") + configClient, err := configclient.NewForConfig(i.adminRESTConfig) + if err != nil { + return fmt.Errorf("error constructing openshift config client: %v", err) + } + infra, err := configClient.ConfigV1().Infrastructures().Get(ctx, "cluster", metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("error getting openshift infrastructure: %v", err) + } + + internalAPI, err := url.Parse(infra.Status.APIServerInternalURL) + if err != nil { + return fmt.Errorf("error parsing api int url: %v", err) + } + apiIntHost := internalAPI.Hostname() + + allNodes, err := i.kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) + if err != nil { + return fmt.Errorf("error getting nodes: %v", err) + } + i.allNodes = int32(len(allNodes.Items)) + + controlPlaneNodes, err := i.kubeClient.CoreV1().Nodes().List(ctx, metav1.ListOptions{ + LabelSelector: labels.Set{"node-role.kubernetes.io/master": ""}.AsSelector().String(), + }) + if err != nil { + return fmt.Errorf("error getting control plane nodes: %v", err) + } + i.controlPlaneNodes = int32(len(controlPlaneNodes.Items)) + + namespace, err := i.createNamespace(ctx) + if err != nil { + return fmt.Errorf("error creating namespace: %v", err) + } + i.namespaceName = namespace + + err = i.createServiceAccount(ctx) + if err != nil { + return fmt.Errorf("error creating service accounts: %v", err) + } + err = i.createRBACPrivileged(ctx) + if err != nil { + return fmt.Errorf("error creating privileged SCC rolebinding: %v", err) + } + err = i.createMonitorClusterRole(ctx) + if err != nil { + return fmt.Errorf("error creating monitor cluster role: %v", err) + } + err = i.createMonitorCRB(ctx) + if err != nil { + return fmt.Errorf("error creating monitor rolebinding: %v", err) + } + err = i.createMonitorRole(ctx) + if err != nil { + return fmt.Errorf("error creating monitor role: %v", err) + } + err = i.createMonitorRB(ctx) + if err != nil { + return fmt.Errorf("error creating monitor rolebinding: %v", err) + } + err = i.createServiceNetworkDeployment(ctx) + if err != nil { + return fmt.Errorf("error creating service network deployment: %v", err) + } + err = i.createLocalhostDeployment(ctx) + if err != nil { + return fmt.Errorf("error creating localhost: %v", err) + } + err = i.createInternalLBDeployment(ctx, apiIntHost) + if err != nil { + return fmt.Errorf("error creating internal LB: %v", err) + } + log.Infof("monitoring deployments started") + return nil +} + +func (i *InvariantInClusterDisruption) CollectData(ctx context.Context, storageDir string, beginning time.Time, end time.Time) (monitorapi.Intervals, []*junitapi.JUnitTestCase, error) { + log := logrus.WithField("monitorTest", "apiserver-incluster-availability").WithField("namespace", i.namespaceName).WithField("func", "CollectData") + + if len(i.notSupportedReason) > 0 { + return nil, nil, nil + } + + log.Infof("creating flag configmap") + + // create the stop collecting configmap and wait for 30s to thing to have stopped. the 30s is just a guess + if _, err := i.kubeClient.CoreV1().ConfigMaps(i.namespaceName).Create(ctx, &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{Name: "stop-collecting"}, + }, metav1.CreateOptions{}); err != nil { + return nil, nil, err + } + + // TODO create back-pressure on the configmap + select { + case <-time.After(30 * time.Second): + case <-ctx.Done(): + return nil, nil, ctx.Err() + } + + log.Infof("collecting data from the deployments") + + pollerLabel, err := labels.NewRequirement("apiserver.openshift.io/disruption-actor", selection.Equals, []string{"poller"}) + if err != nil { + return nil, nil, err + } + + intervals, junits, errs := disruptionlibrary.CollectIntervalsForPods(ctx, i.kubeClient, "Jira: \"kube-apiserver\"", i.namespaceName, labels.NewSelector().Add(*pollerLabel)) + log.Infof("intervals collected") + return intervals, junits, utilerrors.NewAggregate(errs) +} + +func (i *InvariantInClusterDisruption) ConstructComputedIntervals(ctx context.Context, startingIntervals monitorapi.Intervals, _ monitorapi.ResourcesMap, beginning time.Time, end time.Time) (constructedIntervals monitorapi.Intervals, err error) { + return nil, nil +} + +func (i *InvariantInClusterDisruption) EvaluateTestsFromConstructedIntervals(ctx context.Context, finalIntervals monitorapi.Intervals) ([]*junitapi.JUnitTestCase, error) { + return nil, nil +} + +func (i *InvariantInClusterDisruption) WriteContentToStorage(ctx context.Context, storageDir, timeSuffix string, finalIntervals monitorapi.Intervals, finalResourceState monitorapi.ResourcesMap) error { + return nil +} + +func (i *InvariantInClusterDisruption) Cleanup(ctx context.Context) error { + log := logrus.WithField("monitorTest", "apiserver-incluster-availability").WithField("namespace", i.namespaceName).WithField("func", "Cleanup") + if len(i.notSupportedReason) > 0 { + return nil + } + + log.Infof("removing monitoring namespace") + nsClient := i.kubeClient.CoreV1().Namespaces() + err := nsClient.Delete(ctx, i.namespaceName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("error removing namespace %s: %v", i.namespaceName, err) + } + if !apierrors.IsNotFound(err) { + log.Infof("Namespace %s removed", i.namespaceName) + } + + log.Infof("removing monitoring cluster roles and bindings") + crbClient := i.kubeClient.RbacV1().ClusterRoleBindings() + err = crbClient.Delete(ctx, rbacPrivilegedCRBName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("error removing cluster reader CRB: %v", err) + } + if !apierrors.IsNotFound(err) { + log.Infof("CRB %s removed", rbacPrivilegedCRBName) + } + + err = crbClient.Delete(ctx, rbacMonitorCRBName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("error removing monitor CRB: %v", err) + } + if !apierrors.IsNotFound(err) { + log.Infof("CRB %s removed", rbacMonitorCRBName) + } + + rolesClient := i.kubeClient.RbacV1().ClusterRoles() + err = rolesClient.Delete(ctx, rbacMonitorClusterRoleName, metav1.DeleteOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return fmt.Errorf("error removing monitor role: %v", err) + } + if !apierrors.IsNotFound(err) { + log.Infof("Role %s removed", rbacMonitorClusterRoleName) + } + log.Infof("collect data completed") + return nil +} diff --git a/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers/known_backends.go b/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers/known_backends.go deleted file mode 100644 index b91665ad3f01..000000000000 --- a/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers/known_backends.go +++ /dev/null @@ -1,20 +0,0 @@ -package disruptionlegacyapiservers - -import ( - "fmt" - - "github.com/openshift/origin/pkg/monitor/backenddisruption" - "github.com/openshift/origin/pkg/monitor/monitorapi" - "k8s.io/client-go/rest" -) - -func createAPIServerBackendSampler(clusterConfig *rest.Config, disruptionBackendName, url string, connectionType monitorapi.BackendConnectionType) (*backenddisruption.BackendSampler, error) { - // default gets auto-created, so this should always exist - backendSampler, err := backenddisruption.NewAPIServerBackend(clusterConfig, disruptionBackendName, url, connectionType) - if err != nil { - return nil, err - } - backendSampler = backendSampler.WithUserAgent(fmt.Sprintf("openshift-external-backend-sampler-%s-%s", connectionType, disruptionBackendName)) - - return backendSampler, nil -} diff --git a/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers/monitortest.go b/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers/monitortest.go deleted file mode 100644 index d959597dbf0d..000000000000 --- a/pkg/monitortests/kubeapiserver/disruptionlegacyapiservers/monitortest.go +++ /dev/null @@ -1,288 +0,0 @@ -package disruptionlegacyapiservers - -import ( - "context" - _ "embed" - "fmt" - "time" - - "github.com/openshift/origin/pkg/monitortestframework" - - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/rest" - - "github.com/openshift/origin/pkg/monitor/monitorapi" - "github.com/openshift/origin/pkg/monitortestlibrary/disruptionlibrary" - "github.com/openshift/origin/pkg/test/ginkgo/junitapi" -) - -type availability struct { - disruptionCheckers []*disruptionlibrary.Availability - - notSupportedReason error - suppressJunit bool -} - -func NewAvailabilityInvariant() monitortestframework.MonitorTest { - return &availability{} -} - -func NewRecordAvailabilityOnly() monitortestframework.MonitorTest { - return &availability{ - suppressJunit: true, - } -} -func testNames(owner, disruptionBackendName string) (string, string) { - return fmt.Sprintf("[%s] disruption/%s connection/new should be available throughout the test", owner, disruptionBackendName), - fmt.Sprintf("[%s] disruption/%s connection/reused should be available throughout the test", owner, disruptionBackendName) -} - -func newDisruptionCheckerForKubeAPI(adminRESTConfig *rest.Config) (*disruptionlibrary.Availability, error) { - disruptionBackedName := "kube-api" - newConnectionTestName, reusedConnectionTestName := testNames("sig-api-machinery", disruptionBackedName) - newConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/api/v1/namespaces/default", monitorapi.NewConnectionType) - if err != nil { - return nil, err - } - reusedConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/api/v1/namespaces/default", monitorapi.ReusedConnectionType) - if err != nil { - return nil, err - } - return disruptionlibrary.NewAvailabilityInvariant( - newConnectionTestName, reusedConnectionTestName, - newConnections, reusedConnections, - ), nil -} - -func newDisruptionCheckerForKubeAPICached(adminRESTConfig *rest.Config) (*disruptionlibrary.Availability, error) { - // by setting resourceVersion="0" we instruct the server to get the data from the memory cache and avoid contacting with the etcd. - - disruptionBackedName := "cache-kube-api" - newConnectionTestName, reusedConnectionTestName := testNames("sig-api-machinery", disruptionBackedName) - newConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/api/v1/namespaces/default?resourceVersion=0", monitorapi.NewConnectionType) - if err != nil { - return nil, err - } - reusedConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/api/v1/namespaces/default?resourceVersion=0", monitorapi.ReusedConnectionType) - if err != nil { - return nil, err - } - return disruptionlibrary.NewAvailabilityInvariant( - newConnectionTestName, reusedConnectionTestName, - newConnections, reusedConnections, - ), nil -} - -func newDisruptionCheckerForOpenshiftAPI(adminRESTConfig *rest.Config) (*disruptionlibrary.Availability, error) { - disruptionBackedName := "openshift-api" - newConnectionTestName, reusedConnectionTestName := testNames("sig-api-machinery", disruptionBackedName) - newConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/image.openshift.io/v1/namespaces/default/imagestreams", monitorapi.NewConnectionType) - if err != nil { - return nil, err - } - reusedConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/image.openshift.io/v1/namespaces/default/imagestreams", monitorapi.ReusedConnectionType) - if err != nil { - return nil, err - } - return disruptionlibrary.NewAvailabilityInvariant( - newConnectionTestName, reusedConnectionTestName, - newConnections, reusedConnections, - ), nil -} - -func newDisruptionCheckerForOpenshiftAPICached(adminRESTConfig *rest.Config) (*disruptionlibrary.Availability, error) { - // by setting resourceVersion="0" we instruct the server to get the data from the memory cache and avoid contacting with the etcd. - - disruptionBackedName := "cache-openshift-api" - newConnectionTestName, reusedConnectionTestName := testNames("sig-api-machinery", disruptionBackedName) - newConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/image.openshift.io/v1/namespaces/default/imagestreams?resourceVersion=0", monitorapi.NewConnectionType) - if err != nil { - return nil, err - } - reusedConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/image.openshift.io/v1/namespaces/default/imagestreams?resourceVersion=0", monitorapi.ReusedConnectionType) - if err != nil { - return nil, err - } - return disruptionlibrary.NewAvailabilityInvariant( - newConnectionTestName, reusedConnectionTestName, - newConnections, reusedConnections, - ), nil -} - -func newDisruptionCheckerForOAuthAPI(adminRESTConfig *rest.Config) (*disruptionlibrary.Availability, error) { - disruptionBackedName := "oauth-api" - newConnectionTestName, reusedConnectionTestName := testNames("sig-api-machinery", disruptionBackedName) - newConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/oauth.openshift.io/v1/oauthclients", monitorapi.NewConnectionType) - if err != nil { - return nil, err - } - reusedConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/oauth.openshift.io/v1/oauthclients", monitorapi.ReusedConnectionType) - if err != nil { - return nil, err - } - return disruptionlibrary.NewAvailabilityInvariant( - newConnectionTestName, reusedConnectionTestName, - newConnections, reusedConnections, - ), nil -} - -func newDisruptionCheckerForOAuthCached(adminRESTConfig *rest.Config) (*disruptionlibrary.Availability, error) { - // by setting resourceVersion="0" we instruct the server to get the data from the memory cache and avoid contacting with the etcd. - - disruptionBackedName := "cache-oauth-api" - newConnectionTestName, reusedConnectionTestName := testNames("sig-api-machinery", disruptionBackedName) - newConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/oauth.openshift.io/v1/oauthclients?resourceVersion=0", monitorapi.NewConnectionType) - if err != nil { - return nil, err - } - reusedConnections, err := createAPIServerBackendSampler(adminRESTConfig, disruptionBackedName, "/apis/oauth.openshift.io/v1/oauthclients?resourceVersion=0", monitorapi.ReusedConnectionType) - if err != nil { - return nil, err - } - return disruptionlibrary.NewAvailabilityInvariant( - newConnectionTestName, reusedConnectionTestName, - newConnections, reusedConnections, - ), nil -} - -func (w *availability) StartCollection(ctx context.Context, adminRESTConfig *rest.Config, recorder monitorapi.RecorderWriter) error { - var err error - - kubeClient, err := kubernetes.NewForConfig(adminRESTConfig) - if err != nil { - return err - } - - _, err = kubeClient.CoreV1().Namespaces().Get(context.Background(), "openshift-apiserver", metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - w.notSupportedReason = &monitortestframework.NotSupportedError{ - Reason: "namespace openshift-apiserver not present", - } - return w.notSupportedReason - } - if err != nil { - return err - } - _, err = kubeClient.CoreV1().Namespaces().Get(context.Background(), "openshift-oauth-apiserver", metav1.GetOptions{}) - if apierrors.IsNotFound(err) { - w.notSupportedReason = &monitortestframework.NotSupportedError{ - Reason: "namespace openshift-oauth-apiserver not present", - } - return w.notSupportedReason - } - if err != nil { - return err - } - - var curr *disruptionlibrary.Availability - - curr, err = newDisruptionCheckerForKubeAPI(adminRESTConfig) - if err != nil { - return err - } - w.disruptionCheckers = append(w.disruptionCheckers, curr) - curr, err = newDisruptionCheckerForKubeAPICached(adminRESTConfig) - if err != nil { - return err - } - w.disruptionCheckers = append(w.disruptionCheckers, curr) - - curr, err = newDisruptionCheckerForOpenshiftAPI(adminRESTConfig) - if err != nil { - return err - } - w.disruptionCheckers = append(w.disruptionCheckers, curr) - curr, err = newDisruptionCheckerForOpenshiftAPICached(adminRESTConfig) - if err != nil { - return err - } - w.disruptionCheckers = append(w.disruptionCheckers, curr) - - curr, err = newDisruptionCheckerForOAuthAPI(adminRESTConfig) - if err != nil { - return err - } - w.disruptionCheckers = append(w.disruptionCheckers, curr) - curr, err = newDisruptionCheckerForOAuthCached(adminRESTConfig) - if err != nil { - return err - } - w.disruptionCheckers = append(w.disruptionCheckers, curr) - - for i := range w.disruptionCheckers { - if err := w.disruptionCheckers[i].StartCollection(ctx, adminRESTConfig, recorder); err != nil { - return err - } - } - - return nil -} - -func (w *availability) CollectData(ctx context.Context, storageDir string, beginning, end time.Time) (monitorapi.Intervals, []*junitapi.JUnitTestCase, error) { - if w.notSupportedReason != nil { - return nil, nil, w.notSupportedReason - } - - intervals := monitorapi.Intervals{} - junits := []*junitapi.JUnitTestCase{} - errs := []error{} - - for i := range w.disruptionCheckers { - // we failed and indicated it during setup. - if w.disruptionCheckers[i] == nil { - continue - } - - localIntervals, localJunits, localErr := w.disruptionCheckers[i].CollectData(ctx) - intervals = append(intervals, localIntervals...) - junits = append(junits, localJunits...) - if localErr != nil { - errs = append(errs, localErr) - } - } - - return intervals, junits, utilerrors.NewAggregate(errs) -} - -func (w *availability) ConstructComputedIntervals(ctx context.Context, startingIntervals monitorapi.Intervals, recordedResources monitorapi.ResourcesMap, beginning, end time.Time) (monitorapi.Intervals, error) { - return nil, w.notSupportedReason -} - -func (w *availability) EvaluateTestsFromConstructedIntervals(ctx context.Context, finalIntervals monitorapi.Intervals) ([]*junitapi.JUnitTestCase, error) { - if w.notSupportedReason != nil { - return nil, w.notSupportedReason - } - - if w.suppressJunit { - return nil, nil - } - - junits := []*junitapi.JUnitTestCase{} - errs := []error{} - - for i := range w.disruptionCheckers { - // we failed and indicated it during setup. - if w.disruptionCheckers[i] == nil { - continue - } - - localJunits, localErr := w.disruptionCheckers[i].EvaluateTestsFromConstructedIntervals(ctx, finalIntervals) - junits = append(junits, localJunits...) - if localErr != nil { - errs = append(errs, localErr) - } - } - - return junits, utilerrors.NewAggregate(errs) -} - -func (w *availability) WriteContentToStorage(ctx context.Context, storageDir, timeSuffix string, finalIntervals monitorapi.Intervals, finalResourceState monitorapi.ResourcesMap) error { - return w.notSupportedReason -} - -func (w *availability) Cleanup(ctx context.Context) error { - return w.notSupportedReason -} diff --git a/pkg/monitortests/kubeapiserver/disruptionnewapiserver/monitortest.go b/pkg/monitortests/kubeapiserver/disruptionnewapiserver/monitortest.go index 25690915f7f3..7f12282d1bcd 100644 --- a/pkg/monitortests/kubeapiserver/disruptionnewapiserver/monitortest.go +++ b/pkg/monitortests/kubeapiserver/disruptionnewapiserver/monitortest.go @@ -71,9 +71,5 @@ func (w *newAPIServerDisruptionChecker) WriteContentToStorage(ctx context.Contex } func (w *newAPIServerDisruptionChecker) Cleanup(ctx context.Context) error { - if w.notSupportedReason != nil { - return w.notSupportedReason - } - return nil } diff --git a/pkg/monitortests/network/disruptionpodnetwork/monitortest.go b/pkg/monitortests/network/disruptionpodnetwork/monitortest.go index d9f5c460597f..33013f9112cf 100644 --- a/pkg/monitortests/network/disruptionpodnetwork/monitortest.go +++ b/pkg/monitortests/network/disruptionpodnetwork/monitortest.go @@ -1,13 +1,10 @@ package disruptionpodnetwork import ( - "bufio" - "bytes" "context" "embed" _ "embed" "fmt" - "strings" "time" "github.com/google/uuid" @@ -28,8 +25,8 @@ import ( k8simage "k8s.io/kubernetes/test/utils/image" "github.com/openshift/origin/pkg/monitor/monitorapi" - monitorserialization "github.com/openshift/origin/pkg/monitor/serialization" "github.com/openshift/origin/pkg/monitortestframework" + "github.com/openshift/origin/pkg/monitortestlibrary/disruptionlibrary" "github.com/openshift/origin/pkg/test/ginkgo/junitapi" "github.com/openshift/origin/test/extended/util" "github.com/openshift/origin/test/extended/util/image" @@ -91,18 +88,6 @@ func NewPodNetworkAvalibilityInvariant(info monitortestframework.MonitorTestInit } } -func updateDeploymentENVs(deployment *appsv1.Deployment, deploymentID, serviceClusterIP string) *appsv1.Deployment { - for i, env := range deployment.Spec.Template.Spec.Containers[0].Env { - if env.Name == "DEPLOYMENT_ID" { - deployment.Spec.Template.Spec.Containers[0].Env[i].Value = deploymentID - } else if env.Name == "SERVICE_CLUSTER_IP" && len(serviceClusterIP) > 0 { - deployment.Spec.Template.Spec.Containers[0].Env[i].Value = serviceClusterIP - } - } - - return deployment -} - func (pna *podNetworkAvalibility) StartCollection(ctx context.Context, adminRESTConfig *rest.Config, recorder monitorapi.RecorderWriter) error { deploymentID := uuid.New().String() @@ -144,25 +129,25 @@ func (pna *podNetworkAvalibility) StartCollection(ctx context.Context, adminREST podNetworkToPodNetworkPollerDeployment.Spec.Replicas = &numNodes podNetworkToPodNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec - podNetworkToPodNetworkPollerDeployment = updateDeploymentENVs(podNetworkToPodNetworkPollerDeployment, deploymentID, "") + podNetworkToPodNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(podNetworkToPodNetworkPollerDeployment, deploymentID, "") if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkToPodNetworkPollerDeployment, metav1.CreateOptions{}); err != nil { return err } podNetworkToHostNetworkPollerDeployment.Spec.Replicas = &numNodes podNetworkToHostNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec - podNetworkToHostNetworkPollerDeployment = updateDeploymentENVs(podNetworkToHostNetworkPollerDeployment, deploymentID, "") + podNetworkToHostNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(podNetworkToHostNetworkPollerDeployment, deploymentID, "") if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), podNetworkToHostNetworkPollerDeployment, metav1.CreateOptions{}); err != nil { return err } hostNetworkToPodNetworkPollerDeployment.Spec.Replicas = &numNodes hostNetworkToPodNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec - hostNetworkToPodNetworkPollerDeployment = updateDeploymentENVs(hostNetworkToPodNetworkPollerDeployment, deploymentID, "") + hostNetworkToPodNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(hostNetworkToPodNetworkPollerDeployment, deploymentID, "") if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkToPodNetworkPollerDeployment, metav1.CreateOptions{}); err != nil { return err } hostNetworkToHostNetworkPollerDeployment.Spec.Replicas = &numNodes hostNetworkToHostNetworkPollerDeployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec - hostNetworkToHostNetworkPollerDeployment = updateDeploymentENVs(hostNetworkToHostNetworkPollerDeployment, deploymentID, "") + hostNetworkToHostNetworkPollerDeployment = disruptionlibrary.UpdateDeploymentENVs(hostNetworkToHostNetworkPollerDeployment, deploymentID, "") if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), hostNetworkToHostNetworkPollerDeployment, metav1.CreateOptions{}); err != nil { return err } @@ -198,7 +183,7 @@ func (pna *podNetworkAvalibility) StartCollection(ctx context.Context, adminREST for _, deployment := range []*appsv1.Deployment{podNetworkServicePollerDep, hostNetworkServicePollerDep} { deployment.Spec.Replicas = &numNodes deployment.Spec.Template.Spec.Containers[0].Image = openshiftTestsImagePullSpec - deployment = updateDeploymentENVs(deployment, deploymentID, service.Spec.ClusterIP) + deployment = disruptionlibrary.UpdateDeploymentENVs(deployment, deploymentID, service.Spec.ClusterIP) if _, err = pna.kubeClient.AppsV1().Deployments(pna.namespaceName).Create(context.Background(), deployment, metav1.CreateOptions{}); err != nil { return err } @@ -274,72 +259,8 @@ func (pna *podNetworkAvalibility) collectDetailsForPoller(ctx context.Context, t if err != nil { return nil, nil, []error{err} } - pollerPods, err := pna.kubeClient.CoreV1().Pods(pna.namespaceName).List(ctx, metav1.ListOptions{ - LabelSelector: labels.NewSelector().Add(*pollerLabel).Add(*typeLabel).String(), - }) - if err != nil { - return nil, nil, []error{err} - } - - retIntervals := monitorapi.Intervals{} - junits := []*junitapi.JUnitTestCase{} - errs := []error{} - buf := &bytes.Buffer{} - podsWithoutIntervals := []string{} - for _, pollerPod := range pollerPods.Items { - fmt.Fprintf(buf, "\n\nLogs for -n %v pod/%v\n", pollerPod.Namespace, pollerPod.Name) - req := pna.kubeClient.CoreV1().Pods(pna.namespaceName).GetLogs(pollerPod.Name, &corev1.PodLogOptions{}) - if err != nil { - errs = append(errs, err) - continue - } - logStream, err := req.Stream(ctx) - if err != nil { - errs = append(errs, err) - continue - } - - foundInterval := false - scanner := bufio.NewScanner(logStream) - for scanner.Scan() { - line := scanner.Bytes() - buf.Write(line) - buf.Write([]byte("\n")) - if len(line) == 0 { - continue - } - - // not all lines are json, ignore errors. - if currInterval, err := monitorserialization.IntervalFromJSON(line); err == nil { - retIntervals = append(retIntervals, *currInterval) - foundInterval = true - } - } - if !foundInterval { - podsWithoutIntervals = append(podsWithoutIntervals, pollerPod.Name) - } - } - - failures := []string{} - if len(podsWithoutIntervals) > 0 { - failures = append(failures, fmt.Sprintf("%d pods lacked sampler output: [%v]", len(podsWithoutIntervals), strings.Join(podsWithoutIntervals, ", "))) - } - if len(pollerPods.Items) == 0 { - failures = append(failures, fmt.Sprintf("no pods found for poller %q", typeOfConnection)) - } - - logJunit := &junitapi.JUnitTestCase{ - Name: fmt.Sprintf("[sig-network] can collect %v poller pod logs", typeOfConnection), - SystemOut: string(buf.Bytes()), - } - if len(failures) > 0 { - logJunit.FailureOutput = &junitapi.FailureOutput{ - Output: strings.Join(failures, "\n"), - } - } - junits = append(junits, logJunit) - - return retIntervals, junits, errs + labelSelector := labels.NewSelector().Add(*pollerLabel).Add(*typeLabel) + return disruptionlibrary.CollectIntervalsForPods(ctx, pna.kubeClient, "sig-network", pna.namespaceName, labelSelector) } func (pna *podNetworkAvalibility) ConstructComputedIntervals(ctx context.Context, startingIntervals monitorapi.Intervals, recordedResources monitorapi.ResourcesMap, beginning, end time.Time) (constructedIntervals monitorapi.Intervals, err error) { diff --git a/pkg/monitortests/network/legacynetworkmonitortests/disruption.go b/pkg/monitortests/network/legacynetworkmonitortests/disruption.go index 8201965cdff4..545ddd73552e 100644 --- a/pkg/monitortests/network/legacynetworkmonitortests/disruption.go +++ b/pkg/monitortests/network/legacynetworkmonitortests/disruption.go @@ -5,27 +5,44 @@ import ( "strings" "time" + configv1 "github.com/openshift/api/config/v1" + "k8s.io/client-go/rest" + "github.com/openshift/origin/pkg/monitor/monitorapi" "github.com/openshift/origin/pkg/test/ginkgo/junitapi" "github.com/sirupsen/logrus" "sigs.k8s.io/kustomize/kyaml/sets" ) -func TestMultipleSingleSecondDisruptions(events monitorapi.Intervals) []*junitapi.JUnitTestCase { +func TestMultipleSingleSecondDisruptions(events monitorapi.Intervals, clientConfig *rest.Config) []*junitapi.JUnitTestCase { // multipleFailuresTestPrefix is for tests that track a few single second disruptions const multipleFailuresTestPrefix = "[sig-network] there should be nearly zero single second disruptions for " // manyFailureTestPrefix is for tests that track a lot of single second disruptions (more severe than the above) const manyFailureTestPrefix = "[sig-network] there should be reasonably few single second disruptions for " + platform := configv1.NonePlatformType + if clientConfig != nil { + if actualPlatform, err := getPlatformType(clientConfig); err == nil { + platform = actualPlatform + } + } + allServers := sets.String{} - allDisruptionEventsIntervals := events.Filter(monitorapi.IsDisruptionEvent) + allDisruptionEventsIntervals := events.Filter(monitorapi.IsDisruptionEvent).Filter(monitorapi.HasRealLoadBalancer) logrus.Infof("filtered %d intervals down to %d disruption intervals", len(events), len(allDisruptionEventsIntervals)) for _, eventInterval := range allDisruptionEventsIntervals { backend := eventInterval.Locator.Keys[monitorapi.LocatorBackendDisruptionNameKey] + loadbalancer := eventInterval.Locator.Keys[monitorapi.LocatorLoadBalancerKey] + connection := eventInterval.Locator.Keys[monitorapi.LocatorConnectionKey] + switch { case strings.HasPrefix(backend, "ingress-"): allServers.Insert(backend) case strings.Contains(backend, "-api-"): + if loadbalancer == "internal-lb" && connection == "reused" && platform == configv1.AWSPlatformType { + // OCPBUGS-43483: ignore internal-lb disruptions on AWS + continue + } allServers.Insert(backend) } } diff --git a/pkg/monitortests/network/legacynetworkmonitortests/monitortest.go b/pkg/monitortests/network/legacynetworkmonitortests/monitortest.go index 4e1321c4c21d..9725d987acbd 100644 --- a/pkg/monitortests/network/legacynetworkmonitortests/monitortest.go +++ b/pkg/monitortests/network/legacynetworkmonitortests/monitortest.go @@ -42,7 +42,7 @@ func (w *legacyMonitorTests) EvaluateTestsFromConstructedIntervals(ctx context.C junits = append(junits, testNoOVSVswitchdUnreasonablyLongPollIntervals(finalIntervals)...) junits = append(junits, testPodIPReuse(finalIntervals)...) junits = append(junits, testErrorUpdatingEndpointSlices(finalIntervals)...) - junits = append(junits, TestMultipleSingleSecondDisruptions(finalIntervals)...) + junits = append(junits, TestMultipleSingleSecondDisruptions(finalIntervals, w.adminRESTConfig)...) junits = append(junits, testDNSOverlapDisruption(finalIntervals)...) junits = append(junits, testNoTooManyNetlinkEventLogs(finalIntervals)...) diff --git a/pkg/test/ginkgo/cmd_runsuite.go b/pkg/test/ginkgo/cmd_runsuite.go index d46d39180215..e26d4fa26b29 100644 --- a/pkg/test/ginkgo/cmd_runsuite.go +++ b/pkg/test/ginkgo/cmd_runsuite.go @@ -23,12 +23,6 @@ import ( "github.com/sirupsen/logrus" "github.com/onsi/ginkgo/v2" - "github.com/spf13/pflag" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/cli-runtime/pkg/genericclioptions" - "k8s.io/client-go/discovery" - "k8s.io/client-go/rest" - "github.com/openshift/origin/pkg/clioptions/clusterinfo" "github.com/openshift/origin/pkg/defaultmonitortests" "github.com/openshift/origin/pkg/monitor" @@ -36,6 +30,11 @@ import ( "github.com/openshift/origin/pkg/monitortestframework" "github.com/openshift/origin/pkg/riskanalysis" "github.com/openshift/origin/pkg/test/ginkgo/junitapi" + "github.com/spf13/pflag" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/cli-runtime/pkg/genericclioptions" + "k8s.io/client-go/discovery" + "k8s.io/client-go/rest" ) const ( diff --git a/test/extended/operators/manifests/pod.yaml b/test/extended/operators/manifests/pod.yaml index de1ce8b25ef3..3f5e80884185 100644 --- a/test/extended/operators/manifests/pod.yaml +++ b/test/extended/operators/manifests/pod.yaml @@ -53,5 +53,9 @@ spec: - emptyDir: {} name: shared-dir tolerations: + # Ensure pod can be scheduled on master nodes - key: node-role.kubernetes.io/master effect: NoSchedule + # Ensure pod can be scheduled on edge nodes + - key: node-role.kubernetes.io/edge + effect: NoSchedule diff --git a/test/extended/util/disruption/controlplane/known_backends.go b/test/extended/util/disruption/controlplane/known_backends.go index dccf47a6f49f..57e2262e540e 100644 --- a/test/extended/util/disruption/controlplane/known_backends.go +++ b/test/extended/util/disruption/controlplane/known_backends.go @@ -2,88 +2,89 @@ package controlplane import ( "context" + "fmt" "time" - "github.com/openshift/origin/pkg/disruption/backend" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "github.com/openshift/origin/pkg/disruption/backend" disruptionci "github.com/openshift/origin/pkg/disruption/ci" "github.com/openshift/origin/pkg/monitor/monitorapi" - "k8s.io/client-go/rest" + imagev1 "github.com/openshift/client-go/image/clientset/versioned/typed/image/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -func StartAPIMonitoringUsingNewBackend(ctx context.Context, recorder monitorapi.Recorder, clusterConfig *rest.Config, lb backend.LoadBalancerType) error { - factory := disruptionci.NewDisruptionTestFactory(clusterConfig) - if err := startKubeAPIMonitoringWithNewConnectionsHTTP2(ctx, recorder, factory, lb); err != nil { - return err - } - if err := startKubeAPIMonitoringWithConnectionReuseHTTP2(ctx, recorder, factory, lb); err != nil { - return err - } - if err := startKubeAPIMonitoringWithNewConnectionsHTTP1(ctx, recorder, factory, lb); err != nil { - return err - } - if err := startKubeAPIMonitoringWithConnectionReuseHTTP1(ctx, recorder, factory, lb); err != nil { - return err - } - if err := startOpenShiftAPIMonitoringWithNewConnectionsHTTP2(ctx, recorder, factory, lb); err != nil { - return err - } - if err := startOpenShiftAPIMonitoringWithConnectionReuseHTTP2(ctx, recorder, factory, lb); err != nil { - return err - } - return nil -} +func StartAPIMonitoringUsingNewBackend( + ctx context.Context, + recorder monitorapi.Recorder, + clusterConfig *rest.Config, + kubeClient kubernetes.Interface, + lb backend.LoadBalancerType) ([]disruptionci.Sampler, error) { -func startKubeAPIMonitoringWithNewConnectionsHTTP2(ctx context.Context, recorder monitorapi.Recorder, factory disruptionci.Factory, lb backend.LoadBalancerType) error { - backendSampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP2(factory, lb) - if err != nil { - return err - } - return backendSampler.StartEndpointMonitoring(ctx, recorder, nil) -} + samplers := []disruptionci.Sampler{} + errs := []error{} + factory := disruptionci.NewDisruptionTestFactory(clusterConfig, kubeClient) -func startKubeAPIMonitoringWithConnectionReuseHTTP2(ctx context.Context, recorder monitorapi.Recorder, factory disruptionci.Factory, lb backend.LoadBalancerType) error { - backendSampler, err := createKubeAPIMonitoringWithConnectionReuseHTTP2(factory, lb) + ns, err := kubeClient.CoreV1().Namespaces().Get(context.Background(), "default", metav1.GetOptions{}) if err != nil { - return err + return nil, err } - return backendSampler.StartEndpointMonitoring(ctx, recorder, nil) -} + path := fmt.Sprintf("/api/v1/namespaces/default?resourceVersion=%s", ns.ResourceVersion) -func startKubeAPIMonitoringWithNewConnectionsHTTP1(ctx context.Context, recorder monitorapi.Recorder, factory disruptionci.Factory, lb backend.LoadBalancerType) error { - backendSampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP1(factory, lb) + sampler, err := createKubeAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path) + samplers = append(samplers, sampler) + errs = append(errs, err) + + sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path) + samplers = append(samplers, sampler) + errs = append(errs, err) + + sampler, err = createKubeAPIMonitoringWithNewConnectionsHTTP1(factory, lb, path) + samplers = append(samplers, sampler) + errs = append(errs, err) + + sampler, err = createKubeAPIMonitoringWithConnectionReuseHTTP1(factory, lb, path) + samplers = append(samplers, sampler) + errs = append(errs, err) + + client, err := imagev1.NewForConfig(clusterConfig) if err != nil { - return err + return nil, err } - return backendSampler.StartEndpointMonitoring(ctx, recorder, nil) -} - -func startKubeAPIMonitoringWithConnectionReuseHTTP1(ctx context.Context, recorder monitorapi.Recorder, factory disruptionci.Factory, lb backend.LoadBalancerType) error { - backendSampler, err := createKubeAPIMonitoringWithConnectionReuseHTTP1(factory, lb) + streams, err := client.ImageStreams("openshift").List(context.TODO(), metav1.ListOptions{}) if err != nil { - return err + return nil, err } - return backendSampler.StartEndpointMonitoring(ctx, recorder, nil) -} + if len(streams.Items) == 0 { + return nil, fmt.Errorf("no imagestreams found") + } + stream := streams.Items[0] + path = fmt.Sprintf("/apis/image.openshift.io/v1/namespaces/openshift/imagestreams/%s?resourceVersion=%s", stream.Name, stream.ResourceVersion) -func startOpenShiftAPIMonitoringWithNewConnectionsHTTP2(ctx context.Context, recorder monitorapi.Recorder, factory disruptionci.Factory, lb backend.LoadBalancerType) error { - backendSampler, err := createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory, lb) - if err != nil { - return err + sampler, err = createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory, lb, path) + samplers = append(samplers, sampler) + errs = append(errs, err) + + sampler, err = createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory, lb, path) + samplers = append(samplers, sampler) + errs = append(errs, err) + + if combinedErr := utilerrors.NewAggregate(errs); combinedErr != nil { + return nil, combinedErr } - return backendSampler.StartEndpointMonitoring(ctx, recorder, nil) -} -func startOpenShiftAPIMonitoringWithConnectionReuseHTTP2(ctx context.Context, recorder monitorapi.Recorder, factory disruptionci.Factory, lb backend.LoadBalancerType) error { - backendSampler, err := createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory, lb) - if err != nil { - return err + for _, currSampler := range samplers { + localErr := currSampler.StartEndpointMonitoring(ctx, recorder, nil) + errs = append(errs, localErr) } - return backendSampler.StartEndpointMonitoring(ctx, recorder, nil) + + return samplers, utilerrors.NewAggregate(errs) } -func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -91,14 +92,14 @@ func createKubeAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory ConnectionType: monitorapi.NewConnectionType, Protocol: backend.ProtocolHTTP2, }, - Path: "/api/v1/namespaces/default", + Path: path, Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, }) } -func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -106,14 +107,14 @@ func createKubeAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factor ConnectionType: monitorapi.ReusedConnectionType, Protocol: backend.ProtocolHTTP2, }, - Path: "/api/v1/namespaces/default", + Path: path, Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, }) } -func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -121,14 +122,14 @@ func createKubeAPIMonitoringWithNewConnectionsHTTP1(factory disruptionci.Factory ConnectionType: monitorapi.NewConnectionType, Protocol: backend.ProtocolHTTP1, }, - Path: "/api/v1/namespaces/default", + Path: path, Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, }) } -func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType) (disruptionci.Sampler, error) { +func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.KubeAPIServer, @@ -136,14 +137,14 @@ func createKubeAPIMonitoringWithConnectionReuseHTTP1(factory disruptionci.Factor ConnectionType: monitorapi.ReusedConnectionType, Protocol: backend.ProtocolHTTP1, }, - Path: "/api/v1/namespaces/default", + Path: path, Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, }) } -func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType) (disruptionci.Sampler, error) { +func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.OpenShiftAPIServer, @@ -151,14 +152,14 @@ func createOpenShiftAPIMonitoringWithNewConnectionsHTTP2(factory disruptionci.Fa ConnectionType: monitorapi.NewConnectionType, Protocol: backend.ProtocolHTTP2, }, - Path: "/apis/image.openshift.io/v1/namespaces/default/imagestreams", + Path: path, Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true, }) } -func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType) (disruptionci.Sampler, error) { +func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.Factory, lb backend.LoadBalancerType, path string) (disruptionci.Sampler, error) { return factory.New(disruptionci.TestConfiguration{ TestDescriptor: disruptionci.TestDescriptor{ TargetServer: disruptionci.OpenShiftAPIServer, @@ -166,7 +167,7 @@ func createOpenShiftAPIMonitoringWithConnectionReuseHTTP2(factory disruptionci.F ConnectionType: monitorapi.ReusedConnectionType, Protocol: backend.ProtocolHTTP2, }, - Path: "/apis/image.openshift.io/v1/namespaces/default/imagestreams", + Path: path, Timeout: 15 * time.Second, SampleInterval: time.Second, EnableShutdownResponseHeader: true,