Skip to content

Commit

Permalink
Merge pull request #29278 from neisw/revert-29058-in-cluster-fixes-v6
Browse files Browse the repository at this point in the history
Revert "Reapply "OCPBUGS-18865: Reapply "Merge pull request #28944 from vrutkovs/in-cluster-fixes-v4"""
  • Loading branch information
openshift-merge-bot[bot] authored Nov 11, 2024
2 parents 5dff126 + 9d0bf5d commit 80b1c6e
Show file tree
Hide file tree
Showing 34 changed files with 621 additions and 1,500 deletions.
7 changes: 0 additions & 7 deletions pkg/clioptions/iooptions/io_options.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package iooptions

import (
"fmt"
"io"
"os"
"path"

"github.com/spf13/pflag"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -38,11 +36,6 @@ func (o *OutputFlags) ConfigureIOStreams(streams genericclioptions.IOStreams, st
return doNothing, nil
}

dir := path.Dir(o.OutFile)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return doNothing, fmt.Errorf("failed to create parentdir %q: %w", dir, err)
}

f, err := os.OpenFile(o.OutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return doNothing, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/openshift-tests/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ a running cluster.
logrus.Infof("loaded %d intervals", len(intervals))

logrus.Info("running tests")
junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals, nil)
junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals)
for _, junit := range junits {
if junit.FailureOutput != nil {
logrus.Errorf("FAIL: %s", junit.Name)
Expand Down
5 changes: 3 additions & 2 deletions pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ import (

"github.com/spf13/pflag"

"github.com/openshift/origin/pkg/defaultmonitortests"
"github.com/openshift/origin/pkg/monitor"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/kubectl/pkg/util/templates"

"github.com/openshift/origin/pkg/defaultmonitortests"
"github.com/openshift/origin/pkg/monitor"
)

type RunMonitorFlags struct {
Expand Down
298 changes: 116 additions & 182 deletions pkg/cmd/openshift-tests/run-disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,51 +6,46 @@ import (
"io"
"os"
"os/signal"
"sync"
"path/filepath"
"syscall"
"time"

utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"github.com/openshift/origin/pkg/clioptions/clusterinfo"

"k8s.io/apimachinery/pkg/fields"
monitorserialization "github.com/openshift/origin/pkg/monitor/serialization"

"github.com/openshift/origin/pkg/clioptions/iooptions"
"github.com/openshift/origin/pkg/disruption/backend"
disruptionci "github.com/openshift/origin/pkg/disruption/ci"
"github.com/openshift/origin/pkg/monitor"
"github.com/openshift/origin/test/extended/util/disruption/controlplane"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
apimachinerywatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/templates"
)

type RunAPIDisruptionMonitorFlags struct {
ConfigFlags *genericclioptions.ConfigFlags
OutputFlags *iooptions.OutputFlags
"github.com/openshift/origin/pkg/disruption/backend"
"github.com/openshift/origin/pkg/monitor"
"github.com/openshift/origin/pkg/monitor/apiserveravailability"
"github.com/openshift/origin/pkg/monitor/monitorapi"
"github.com/openshift/origin/test/extended/util/disruption/controlplane"
"github.com/spf13/cobra"
)

ArtifactDir string
LoadBalancerType string
StopConfigMapName string
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
type RunAPIDisruptionMonitorOptions struct {
Out, ErrOut io.Writer

genericclioptions.IOStreams
ArtifactDir string
LoadBalancerType string
ExtraMessage string
}

func NewRunInClusterDisruptionMonitorFlags(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorFlags {
return &RunAPIDisruptionMonitorFlags{
ConfigFlags: genericclioptions.NewConfigFlags(false),
OutputFlags: iooptions.NewOutputOptions(),
IOStreams: ioStreams,
func NewRunInClusterDisruptionMonitorOptions(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorOptions {
return &RunAPIDisruptionMonitorOptions{
Out: ioStreams.Out,
ErrOut: ioStreams.ErrOut,
}
}

func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStreams) *cobra.Command {
f := NewRunInClusterDisruptionMonitorFlags(ioStreams)
disruptionOpt := NewRunInClusterDisruptionMonitorOptions(ioStreams)
cmd := &cobra.Command{
Use: "run-disruption",
Short: "Run API server disruption monitor",
Expand All @@ -61,183 +56,122 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
abortCh := make(chan os.Signal, 2)
go func() {
<-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
cancelFn()

sig := <-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
switch sig {
case syscall.SIGINT:
os.Exit(130)
default:
os.Exit(0)
}
}()
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)

if err := f.Validate(); err != nil {
return err
}

o, err := f.ToOptions()
if err != nil {
return err
}

return o.Run(ctx)
return disruptionOpt.Run()
},
}

f.AddFlags(cmd.Flags())

cmd.Flags().StringVar(&disruptionOpt.ArtifactDir,
"artifact-dir", disruptionOpt.ArtifactDir,
"The directory where monitor events will be stored.")
cmd.Flags().StringVar(&disruptionOpt.LoadBalancerType,
"lb-type", disruptionOpt.LoadBalancerType,
"Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
cmd.Flags().StringVar(&disruptionOpt.ExtraMessage,
"extra-message", disruptionOpt.ExtraMessage,
"Add custom label to disruption event message")
return cmd
}

func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.")

f.ConfigFlags.AddFlags(flags)
f.OutputFlags.BindFlags(flags)
}

func (f *RunAPIDisruptionMonitorFlags) SetIOStreams(streams genericclioptions.IOStreams) {
f.IOStreams = streams
}

func (f *RunAPIDisruptionMonitorFlags) Validate() error {
if len(f.OutputFlags.OutFile) == 0 {
return fmt.Errorf("output-file must be specified")
}
if len(f.StopConfigMapName) == 0 {
return fmt.Errorf("stop-configmap must be specified")
}

return nil
}

func (f *RunAPIDisruptionMonitorFlags) ToOptions() (*RunAPIDisruptionMonitorOptions, error) {
originalOutStream := f.IOStreams.Out
closeFn, err := f.OutputFlags.ConfigureIOStreams(f.IOStreams, f)
func (opt *RunAPIDisruptionMonitorOptions) Run() error {
restConfig, err := clusterinfo.GetMonitorRESTConfig()
if err != nil {
return nil, err
return err
}

namespace, _, err := f.ConfigFlags.ToRawKubeConfigLoader().Namespace()
if err != nil {
return nil, err
}
if len(namespace) == 0 {
return nil, fmt.Errorf("namespace must be specified")
}
lb := backend.ParseStringToLoadBalancerType(opt.LoadBalancerType)

restConfig, err := f.ConfigFlags.ToRESTConfig()
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
abortCh := make(chan os.Signal, 2)
go func() {
<-abortCh
fmt.Fprintf(opt.ErrOut, "Interrupted, terminating\n")
// Give some time to store intervals on disk
time.Sleep(5 * time.Second)
cancelFn()
sig := <-abortCh
fmt.Fprintf(opt.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
switch sig {
case syscall.SIGINT:
os.Exit(130)
default:
os.Exit(0)
}
}()
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)

recorder, err := StartAPIAvailability(ctx, restConfig, lb)
if err != nil {
return nil, err
return err
}

return &RunAPIDisruptionMonitorOptions{
KubeClient: kubeClient,
KubeClientConfig: restConfig,
OutputFile: f.OutputFlags.OutFile,
LoadBalancerType: f.LoadBalancerType,
StopConfigMapName: f.StopConfigMapName,
Namespace: namespace,
CloseFn: closeFn,
OriginalOutFile: originalOutStream,
IOStreams: f.IOStreams,
}, nil
}

// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
type RunAPIDisruptionMonitorOptions struct {
KubeClient kubernetes.Interface
KubeClientConfig *rest.Config
OutputFile string
LoadBalancerType string
StopConfigMapName string
Namespace string

OriginalOutFile io.Writer
CloseFn iooptions.CloseFunc
genericclioptions.IOStreams
}
go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
var last time.Time
done := false
for !done {
select {
case <-ticker.C:
case <-ctx.Done():
done = true
}
events := recorder.Intervals(last, time.Time{})
if len(events) > 0 {
for _, event := range events {
if !event.From.Equal(event.To) {
continue
}
fmt.Fprintln(opt.Out, event.String())
}
last = events[len(events)-1].From
}
}
}()

func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()
<-ctx.Done()

fmt.Fprintf(o.Out, "Starting up.")
// Store intervals to artifact directory
intervals := recorder.Intervals(time.Time{}, time.Time{})
if len(opt.ExtraMessage) > 0 {
fmt.Fprintf(opt.Out, "\nAppending %s to recorded event message\n", opt.ExtraMessage)
for i, event := range intervals {
intervals[i].Message.HumanMessage = fmt.Sprintf("%s user-provided-message=%s", event.Message.HumanMessage, opt.ExtraMessage)
}
}

startingContent, err := os.ReadFile(o.OutputFile)
if err != nil && !os.IsNotExist(err) {
eventDir := filepath.Join(opt.ArtifactDir, monitorapi.EventDir)
if err := os.MkdirAll(eventDir, os.ModePerm); err != nil {
fmt.Printf("Failed to create monitor-events directory, err: %v\n", err)
return err
}
if len(startingContent) > 0 {
// print starting content to the log so that we can simply scrape the log to find all entries at the end.
o.OriginalOutFile.Write(startingContent)
}

lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType)

recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb)
if err != nil {
timeSuffix := fmt.Sprintf("_%s", time.Now().UTC().Format("20060102-150405"))
if err := monitorserialization.EventsToFile(filepath.Join(eventDir, fmt.Sprintf("e2e-events%s.json", timeSuffix)), intervals); err != nil {
fmt.Printf("Failed to write event data, err: %v\n", err)
return err
}
fmt.Fprintf(opt.Out, "\nEvent data written, exiting\n")

go func(ctx context.Context) {
defer cancelFn()
err := o.WaitForStopSignal(ctx)
if err != nil {
fmt.Fprintf(o.ErrOut, "failure waiting for stop: %v", err)
}
}(ctx)
return nil
}

<-ctx.Done()
// StartAPIAvailability monitors just the cluster availability
func StartAPIAvailability(ctx context.Context, restConfig *rest.Config, lb backend.LoadBalancerType) (monitorapi.Recorder, error) {
recorder := monitor.NewRecorder()

fmt.Fprintf(o.Out, "waiting for samplers to stop")
wg := sync.WaitGroup{}
for i := range samplers {
wg.Add(1)
func(sampler disruptionci.Sampler) {
defer wg.Done()
sampler.Stop()
}(samplers[i])
client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
if err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, restConfig, lb); err != nil {
return nil, err
}
wg.Wait()
fmt.Fprintf(o.Out, "samplers stopped")

return nil
}

func (o *RunAPIDisruptionMonitorOptions) WaitForStopSignal(ctx context.Context) error {
defer utilruntime.HandleCrash()

_, err := watch.UntilWithSync(
ctx,
cache.NewListWatchFromClient(
o.KubeClient.CoreV1().RESTClient(), "configmaps", o.Namespace, fields.OneTermEqualSelector("metadata.name", o.StopConfigMapName)),
&corev1.ConfigMap{},
nil,
func(event apimachinerywatch.Event) (bool, error) {
switch event.Type {
case apimachinerywatch.Added:
return true, nil
case apimachinerywatch.Modified:
return true, nil
}
return false, nil
},
)
return err
// read the state of the cluster apiserver client access issues *before* any test (like upgrade) begins
intervals, err := apiserveravailability.APIServerAvailabilityIntervalsFromCluster(client, time.Time{}, time.Time{})
if err != nil {
klog.Errorf("error reading initial apiserver availability: %v", err)
}
recorder.AddIntervals(intervals...)
return recorder, nil
}
Loading

0 comments on commit 80b1c6e

Please sign in to comment.