Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disruption tests: accept gzipped protobuf responses #29222

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/clioptions/iooptions/io_options.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package iooptions

import (
"fmt"
"io"
"os"
"path"

"github.com/spf13/pflag"
"k8s.io/cli-runtime/pkg/genericclioptions"
Expand Down Expand Up @@ -36,6 +38,11 @@ func (o *OutputFlags) ConfigureIOStreams(streams genericclioptions.IOStreams, st
return doNothing, nil
}

dir := path.Dir(o.OutFile)
if err := os.MkdirAll(dir, os.ModePerm); err != nil {
return doNothing, fmt.Errorf("failed to create parentdir %q: %w", dir, err)
}

f, err := os.OpenFile(o.OutFile, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0640)
if err != nil {
return doNothing, err
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/openshift-tests/dev/dev.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ a running cluster.
logrus.Infof("loaded %d intervals", len(intervals))

logrus.Info("running tests")
junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals)
junits := legacynetworkmonitortests.TestMultipleSingleSecondDisruptions(intervals, nil)
for _, junit := range junits {
if junit.FailureOutput != nil {
logrus.Errorf("FAIL: %s", junit.Name)
Expand Down
5 changes: 2 additions & 3 deletions pkg/cmd/openshift-tests/monitor/run/run_monitor_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,11 @@ import (

"github.com/spf13/pflag"

"github.com/openshift/origin/pkg/defaultmonitortests"
"github.com/openshift/origin/pkg/monitor"
"github.com/spf13/cobra"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/kubectl/pkg/util/templates"

"github.com/openshift/origin/pkg/defaultmonitortests"
"github.com/openshift/origin/pkg/monitor"
)

type RunMonitorFlags struct {
Expand Down
298 changes: 182 additions & 116 deletions pkg/cmd/openshift-tests/run-disruption/disruption.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,46 +6,51 @@ import (
"io"
"os"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"

"github.com/openshift/origin/pkg/clioptions/clusterinfo"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"

monitorserialization "github.com/openshift/origin/pkg/monitor/serialization"

"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
"k8s.io/kubectl/pkg/util/templates"
"k8s.io/apimachinery/pkg/fields"

"github.com/openshift/origin/pkg/clioptions/iooptions"
"github.com/openshift/origin/pkg/disruption/backend"
disruptionci "github.com/openshift/origin/pkg/disruption/ci"
"github.com/openshift/origin/pkg/monitor"
"github.com/openshift/origin/pkg/monitor/apiserveravailability"
"github.com/openshift/origin/pkg/monitor/monitorapi"
"github.com/openshift/origin/test/extended/util/disruption/controlplane"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
corev1 "k8s.io/api/core/v1"
apimachinerywatch "k8s.io/apimachinery/pkg/watch"
"k8s.io/cli-runtime/pkg/genericclioptions"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/watch"
"k8s.io/kubectl/pkg/util/templates"
)

// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
type RunAPIDisruptionMonitorOptions struct {
Out, ErrOut io.Writer
type RunAPIDisruptionMonitorFlags struct {
ConfigFlags *genericclioptions.ConfigFlags
OutputFlags *iooptions.OutputFlags

ArtifactDir string
LoadBalancerType string
StopConfigMapName string

ArtifactDir string
LoadBalancerType string
ExtraMessage string
genericclioptions.IOStreams
}

func NewRunInClusterDisruptionMonitorOptions(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorOptions {
return &RunAPIDisruptionMonitorOptions{
Out: ioStreams.Out,
ErrOut: ioStreams.ErrOut,
func NewRunInClusterDisruptionMonitorFlags(ioStreams genericclioptions.IOStreams) *RunAPIDisruptionMonitorFlags {
return &RunAPIDisruptionMonitorFlags{
ConfigFlags: genericclioptions.NewConfigFlags(false),
OutputFlags: iooptions.NewOutputOptions(),
IOStreams: ioStreams,
}
}

func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStreams) *cobra.Command {
disruptionOpt := NewRunInClusterDisruptionMonitorOptions(ioStreams)
f := NewRunInClusterDisruptionMonitorFlags(ioStreams)
cmd := &cobra.Command{
Use: "run-disruption",
Short: "Run API server disruption monitor",
Expand All @@ -56,122 +61,183 @@ func NewRunInClusterDisruptionMonitorCommand(ioStreams genericclioptions.IOStrea
SilenceUsage: true,
SilenceErrors: true,
RunE: func(cmd *cobra.Command, args []string) error {
return disruptionOpt.Run()
ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
abortCh := make(chan os.Signal, 2)
go func() {
<-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted, terminating\n")
cancelFn()

sig := <-abortCh
fmt.Fprintf(f.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
switch sig {
case syscall.SIGINT:
os.Exit(130)
default:
os.Exit(0)
}
}()
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)

if err := f.Validate(); err != nil {
return err
}

o, err := f.ToOptions()
if err != nil {
return err
}

return o.Run(ctx)
},
}
cmd.Flags().StringVar(&disruptionOpt.ArtifactDir,
"artifact-dir", disruptionOpt.ArtifactDir,
"The directory where monitor events will be stored.")
cmd.Flags().StringVar(&disruptionOpt.LoadBalancerType,
"lb-type", disruptionOpt.LoadBalancerType,
"Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
cmd.Flags().StringVar(&disruptionOpt.ExtraMessage,
"extra-message", disruptionOpt.ExtraMessage,
"Add custom label to disruption event message")

f.AddFlags(cmd.Flags())

return cmd
}

func (opt *RunAPIDisruptionMonitorOptions) Run() error {
restConfig, err := clusterinfo.GetMonitorRESTConfig()
if err != nil {
return err
}
func (f *RunAPIDisruptionMonitorFlags) AddFlags(flags *pflag.FlagSet) {
flags.StringVar(&f.LoadBalancerType, "lb-type", f.LoadBalancerType, "Set load balancer type, available options: internal-lb, service-network, external-lb (default)")
flags.StringVar(&f.StopConfigMapName, "stop-configmap", f.StopConfigMapName, "the name of the configmap that indicates that this pod should stop all watchers.")

lb := backend.ParseStringToLoadBalancerType(opt.LoadBalancerType)
f.ConfigFlags.AddFlags(flags)
f.OutputFlags.BindFlags(flags)
}

ctx, cancelFn := context.WithCancel(context.Background())
defer cancelFn()
abortCh := make(chan os.Signal, 2)
go func() {
<-abortCh
fmt.Fprintf(opt.ErrOut, "Interrupted, terminating\n")
// Give some time to store intervals on disk
time.Sleep(5 * time.Second)
cancelFn()
sig := <-abortCh
fmt.Fprintf(opt.ErrOut, "Interrupted twice, exiting (%s)\n", sig)
switch sig {
case syscall.SIGINT:
os.Exit(130)
default:
os.Exit(0)
}
}()
signal.Notify(abortCh, syscall.SIGINT, syscall.SIGTERM)
func (f *RunAPIDisruptionMonitorFlags) SetIOStreams(streams genericclioptions.IOStreams) {
f.IOStreams = streams
}

recorder, err := StartAPIAvailability(ctx, restConfig, lb)
if err != nil {
return err
func (f *RunAPIDisruptionMonitorFlags) Validate() error {
if len(f.OutputFlags.OutFile) == 0 {
return fmt.Errorf("output-file must be specified")
}
if len(f.StopConfigMapName) == 0 {
return fmt.Errorf("stop-configmap must be specified")
}

go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
var last time.Time
done := false
for !done {
select {
case <-ticker.C:
case <-ctx.Done():
done = true
}
events := recorder.Intervals(last, time.Time{})
if len(events) > 0 {
for _, event := range events {
if !event.From.Equal(event.To) {
continue
}
fmt.Fprintln(opt.Out, event.String())
}
last = events[len(events)-1].From
}
}
}()

<-ctx.Done()
return nil
}

// Store intervals to artifact directory
intervals := recorder.Intervals(time.Time{}, time.Time{})
if len(opt.ExtraMessage) > 0 {
fmt.Fprintf(opt.Out, "\nAppending %s to recorded event message\n", opt.ExtraMessage)
for i, event := range intervals {
intervals[i].Message.HumanMessage = fmt.Sprintf("%s user-provided-message=%s", event.Message.HumanMessage, opt.ExtraMessage)
}
func (f *RunAPIDisruptionMonitorFlags) ToOptions() (*RunAPIDisruptionMonitorOptions, error) {
originalOutStream := f.IOStreams.Out
closeFn, err := f.OutputFlags.ConfigureIOStreams(f.IOStreams, f)
if err != nil {
return nil, err
}

eventDir := filepath.Join(opt.ArtifactDir, monitorapi.EventDir)
if err := os.MkdirAll(eventDir, os.ModePerm); err != nil {
fmt.Printf("Failed to create monitor-events directory, err: %v\n", err)
return err
namespace, _, err := f.ConfigFlags.ToRawKubeConfigLoader().Namespace()
if err != nil {
return nil, err
}
if len(namespace) == 0 {
return nil, fmt.Errorf("namespace must be specified")
}

timeSuffix := fmt.Sprintf("_%s", time.Now().UTC().Format("20060102-150405"))
if err := monitorserialization.EventsToFile(filepath.Join(eventDir, fmt.Sprintf("e2e-events%s.json", timeSuffix)), intervals); err != nil {
fmt.Printf("Failed to write event data, err: %v\n", err)
return err
restConfig, err := f.ConfigFlags.ToRESTConfig()
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
}
fmt.Fprintf(opt.Out, "\nEvent data written, exiting\n")

return nil
return &RunAPIDisruptionMonitorOptions{
KubeClient: kubeClient,
KubeClientConfig: restConfig,
OutputFile: f.OutputFlags.OutFile,
LoadBalancerType: f.LoadBalancerType,
StopConfigMapName: f.StopConfigMapName,
Namespace: namespace,
CloseFn: closeFn,
OriginalOutFile: originalOutStream,
IOStreams: f.IOStreams,
}, nil
}

// StartAPIAvailability monitors just the cluster availability
func StartAPIAvailability(ctx context.Context, restConfig *rest.Config, lb backend.LoadBalancerType) (monitorapi.Recorder, error) {
recorder := monitor.NewRecorder()
// RunAPIDisruptionMonitorOptions sets options for api server disruption monitor
type RunAPIDisruptionMonitorOptions struct {
KubeClient kubernetes.Interface
KubeClientConfig *rest.Config
OutputFile string
LoadBalancerType string
StopConfigMapName string
Namespace string

OriginalOutFile io.Writer
CloseFn iooptions.CloseFunc
genericclioptions.IOStreams
}

client, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return nil, err
func (o *RunAPIDisruptionMonitorOptions) Run(ctx context.Context) error {
ctx, cancelFn := context.WithCancel(ctx)
defer cancelFn()

fmt.Fprintf(o.Out, "Starting up.")

startingContent, err := os.ReadFile(o.OutputFile)
if err != nil && !os.IsNotExist(err) {
return err
}
if err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, restConfig, lb); err != nil {
return nil, err
if len(startingContent) > 0 {
// print starting content to the log so that we can simply scrape the log to find all entries at the end.
o.OriginalOutFile.Write(startingContent)
}

// read the state of the cluster apiserver client access issues *before* any test (like upgrade) begins
intervals, err := apiserveravailability.APIServerAvailabilityIntervalsFromCluster(client, time.Time{}, time.Time{})
lb := backend.ParseStringToLoadBalancerType(o.LoadBalancerType)

recorder := monitor.WrapWithJSONLRecorder(monitor.NewRecorder(), o.IOStreams.Out, nil)
samplers, err := controlplane.StartAPIMonitoringUsingNewBackend(ctx, recorder, o.KubeClientConfig, o.KubeClient, lb)
if err != nil {
klog.Errorf("error reading initial apiserver availability: %v", err)
return err
}
recorder.AddIntervals(intervals...)
return recorder, nil

go func(ctx context.Context) {
defer cancelFn()
err := o.WaitForStopSignal(ctx)
if err != nil {
fmt.Fprintf(o.ErrOut, "failure waiting for stop: %v", err)
}
}(ctx)

<-ctx.Done()

fmt.Fprintf(o.Out, "waiting for samplers to stop")
wg := sync.WaitGroup{}
for i := range samplers {
wg.Add(1)
func(sampler disruptionci.Sampler) {
defer wg.Done()
sampler.Stop()
}(samplers[i])
}
wg.Wait()
fmt.Fprintf(o.Out, "samplers stopped")

return nil
}

func (o *RunAPIDisruptionMonitorOptions) WaitForStopSignal(ctx context.Context) error {
defer utilruntime.HandleCrash()

_, err := watch.UntilWithSync(
ctx,
cache.NewListWatchFromClient(
o.KubeClient.CoreV1().RESTClient(), "configmaps", o.Namespace, fields.OneTermEqualSelector("metadata.name", o.StopConfigMapName)),
&corev1.ConfigMap{},
nil,
func(event apimachinerywatch.Event) (bool, error) {
switch event.Type {
case apimachinerywatch.Added:
return true, nil
case apimachinerywatch.Modified:
return true, nil
}
return false, nil
},
)
return err
}
Loading