Skip to content

Commit

Permalink
[0.17] Ported recent test infra improvements (#4403)
Browse files Browse the repository at this point in the history
* Nit (#4385)

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

(cherry picked from commit 3ceaad4)
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* emit a k8s event when dropping events (#4389)

* emit a k8s event when dropping events

Signed-off-by: Ville Aikas <vaikas@vmware.com>

* go imports

Signed-off-by: Ville Aikas <vaikas@vmware.com>

* tags

Signed-off-by: Ville Aikas <vaikas@vmware.com>

* fix silliness

Signed-off-by: Ville Aikas <vaikas@vmware.com>

* simplify

Signed-off-by: Ville Aikas <vaikas@vmware.com>
(cherry picked from commit 0a54fd9)

* [recordevents] Removed EventBroadcaster usage and replaced with manual send (#4393)

* Removed EventBroadcaster usage and replaced with manual creation and send of events

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Boilerplate

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Remove redundant format
Removed sequence annotation

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Added required value

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* ?!?!

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Maybe this one fix the issue?

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Maybe this one fix the issue?

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Removed useless double log line

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Remove useless headers

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Missing host header

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Nit

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Now it works on my machine, i'm warning you prow!

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Nit

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Now it works for long events too

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Suggestions

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Fixed the dropped counter thing

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Nit

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

(cherry picked from commit 7de59ec)
Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

* Wrong merge fix

Signed-off-by: Francesco Guardiani <francescoguard@gmail.com>

Co-authored-by: Ville Aikas <11279988+vaikas@users.noreply.github.com>
  • Loading branch information
slinkydeveloper and vaikas authored Oct 27, 2020
1 parent 1859368 commit ba45a11
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 125 deletions.
14 changes: 14 additions & 0 deletions test/lib/dropevents/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,20 @@ const (
NumberKey = "NUMBER"
)

// count is only used for SKIP_ALGORITHM=Sequence.
func SkipperAlgorithmWithCount(algorithm string, count uint64) Skipper {
switch algorithm {
case Fibonacci:
return &dropeventsfibonacci.Fibonacci{Prev: 1, Current: 1}

case Sequence:
return dropeventsfirst.First{N: count}

default:
panic("unknown algorithm: " + algorithm)
}
}

func SkipperAlgorithm(algorithm string) Skipper {

switch algorithm {
Expand Down
30 changes: 25 additions & 5 deletions test/lib/recordevents/event_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,36 @@ type EventInfo struct {
Observer string `json:"observer,omitempty"`
Time time.Time `json:"time,omitempty"`
Sequence uint64 `json:"sequence"`
Dropped bool `json:"dropped"`
}

// Pretty print the event. Meant for debugging. This formats the validation error
// or the full event as appropriate. This does NOT format the headers.
// Pretty print the event. Meant for debugging.
func (ei *EventInfo) String() string {
var sb strings.Builder
sb.WriteString("-- EventInfo --\n")
if ei.Event != nil {
return ei.Event.String()
} else {
return fmt.Sprintf("invalid event \"%s\"", ei.Error)
sb.WriteString("--- Event ---\n")
sb.WriteString(ei.Event.String())
sb.WriteRune('\n')
sb.WriteRune('\n')
}
if ei.Error != "" {
sb.WriteString("--- Error ---\n")
sb.WriteString(ei.Error)
sb.WriteRune('\n')
sb.WriteRune('\n')
}
sb.WriteString("--- HTTP headers ---\n")
for k, v := range ei.HTTPHeaders {
sb.WriteString(" " + k + ": " + v[0] + "\n")
}
sb.WriteRune('\n')
sb.WriteString("--- Origin: '" + ei.Origin + "' ---\n")
sb.WriteString("--- Observer: '" + ei.Observer + "' ---\n")
sb.WriteString("--- Time: " + ei.Time.String() + " ---\n")
sb.WriteString(fmt.Sprintf("--- Sequence: %d ---\n", ei.Sequence))
sb.WriteString(fmt.Sprintf("--- Dropped: %v ---\n", ei.Dropped))
return sb.String()
}

// This is mainly used for providing better failure messages
Expand Down
27 changes: 27 additions & 0 deletions test/lib/recordevents/logger_vent/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2020 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package logger_vent

import "knative.dev/eventing/test/lib/recordevents"

type Logger func(string, ...interface{})

func (l Logger) Vent(observed recordevents.EventInfo) error {
l("Event: \n%s", observed.String())

return nil
}
61 changes: 56 additions & 5 deletions test/lib/recordevents/observer/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,16 @@ package observer
import (
"context"
"net/http"
"strings"
"sync/atomic"
"time"

"knative.dev/eventing/test/lib/dropevents"

cloudeventsbindings "github.com/cloudevents/sdk-go/v2/binding"
cloudeventshttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"github.com/kelseyhightower/envconfig"
"go.uber.org/zap"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
Expand All @@ -41,11 +45,12 @@ type Observer struct {
ctx context.Context
seq uint64
replyFunc func(context.Context, http.ResponseWriter, recordevents.EventInfo)
counter *dropevents.CounterHandler
}

type envConfig struct {
// ObserverName is used to identify this instance of the observer.
ObserverName string `envconfig:"OBSERVER_NAME" default:"observer-default" required:"true"`
ObserverName string `envconfig:"POD_NAME" default:"observer-default" required:"true"`

// Reply is used to define if the observer should reply back
Reply bool `envconfig:"REPLY" default:"false" required:"false"`
Expand All @@ -62,6 +67,13 @@ type envConfig struct {
// This string to append in the data field in the reply, if enabled.
// This will threat the data as text/plain field
ReplyAppendData string `envconfig:"REPLY_APPEND_DATA" default:"" required:"false"`

// If events should be dropped, specify the strategy here.
SkipStrategy string `envconfig:"SKIP_ALGORITHM" default:"" required:"false"`

// If events should be dropped according to Linear policy, this controls
// how many events are dropped.
SkipCounter uint64 `envconfig:"SKIP_COUNTER" default:"0" required:"false"`
}

func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observer {
Expand All @@ -80,12 +92,25 @@ func NewFromEnv(ctx context.Context, eventLogs ...recordevents.EventLog) *Observ
logging.FromContext(ctx).Info("Observer won't reply with an event")
replyFunc = NoOpReply
}
var counter *dropevents.CounterHandler

if env.SkipStrategy != "" {
counter = &dropevents.CounterHandler{
Skipper: dropevents.SkipperAlgorithmWithCount(env.SkipStrategy, env.SkipCounter),
}
} else {
counter = &dropevents.CounterHandler{
// Don't skip anything, since count is 0. nop skipper.
Skipper: dropevents.SkipperAlgorithmWithCount(dropevents.Sequence, 0),
}
}

return &Observer{
Name: env.ObserverName,
EventLogs: eventLogs,
ctx: ctx,
replyFunc: replyFunc,
counter: counter,
}
}

Expand Down Expand Up @@ -118,25 +143,51 @@ func (o *Observer) ServeHTTP(writer http.ResponseWriter, request *http.Request)
defer m.Finish(nil)

event, eventErr := cloudeventsbindings.ToEvent(context.TODO(), m)
header := request.Header
headers := make(http.Header)
for k, v := range request.Header {
if !strings.HasPrefix(k, "Ce-") {
headers[k] = v
}
}
// Host header is removed from the request.Header map by net/http
if request.Host != "" {
headers.Set("Host", request.Host)
}

eventErrStr := ""
if eventErr != nil {
eventErrStr = eventErr.Error()
}

shouldSkip := o.counter.Skip()

eventInfo := recordevents.EventInfo{
Error: eventErrStr,
Event: event,
HTTPHeaders: header,
HTTPHeaders: headers,
Origin: request.RemoteAddr,
Observer: o.Name,
Time: time.Now(),
Sequence: atomic.AddUint64(&o.seq, 1),
Dropped: shouldSkip,
}

// We still want to emit the event to make it easier to see what we had oberved, but
// we want to transform it a little bit before emitting so that it does not count
// as the real event that we want to emit.
if shouldSkip {
eventInfo.Event.SetType("dropped-" + eventInfo.Event.Type())
}

err := o.EventLogs.Vent(eventInfo)
if err != nil {
logging.FromContext(o.ctx).Warn("Error while venting the recorded event", err)
logging.FromContext(o.ctx).Fatalw("Error while venting the recorded event", zap.Error(err))
}

o.replyFunc(o.ctx, writer, eventInfo)
if shouldSkip {
// Trigger a redelivery
writer.WriteHeader(http.StatusConflict)
} else {
o.replyFunc(o.ctx, writer, eventInfo)
}
}
95 changes: 9 additions & 86 deletions test/lib/recordevents/recorder_vent/constructor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,15 @@ package recorder_vent
import (
"context"
"log"
"math/rand"
"time"

"github.com/kelseyhightower/envconfig"
"k8s.io/apimachinery/pkg/api/errors"
restclient "k8s.io/client-go/rest"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
ref "k8s.io/client-go/tools/reference"

kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/controller"
"knative.dev/pkg/logging"

"knative.dev/eventing/test/lib/recordevents"
Expand Down Expand Up @@ -70,86 +64,15 @@ func NewEventLog(ctx context.Context, agentName string, podName string, podNames

logging.FromContext(ctx).Infof("Going to send events to pod '%s' in namespace '%s'", on.Name, on.Namespace)

return &recorder{out: createRecorder(ctx, agentName, podNamespace), on: on}
}

func createRecorder(ctx context.Context, agentName string, namespace string) record.EventRecorder {
logger := logging.FromContext(ctx)

recorder := controller.GetEventRecorder(ctx)
if recorder == nil {
// Create event broadcaster
logger.Debug("Creating event broadcaster")
eventBroadcaster := record.NewBroadcaster()
watches := []watch.Interface{
eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof),
eventBroadcaster.StartEventWatcher(
sendToSink(ctx, kubeclient.Get(ctx).CoreV1().Events(namespace).CreateWithEventNamespace),
),
}
recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: agentName})
go func() {
<-ctx.Done()
for _, w := range watches {
w.Stop()
}
logging.FromContext(ctx).Debug("Closed event-broadcaster")
}()
}

return recorder
}

func sendToSink(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error)) func(*corev1.Event) {
return func(event *corev1.Event) {
tries := 0
for {
if recordEvent(ctx, sender, event) {
break
}
tries++
if tries >= maxRetry {
logging.FromContext(ctx).Errorf("Unable to write event '%s' (retry limit exceeded!)", event.Name)
break
}
// Randomize the first sleep so that various clients won't all be
// synced up if the master goes down.
if tries == 1 {
time.Sleep(time.Duration(float64(sleepDuration) * rand.Float64()))
} else {
time.Sleep(sleepDuration)
}
}
}
}

func recordEvent(ctx context.Context, sender func(*corev1.Event) (*corev1.Event, error), event *corev1.Event) bool {
newEv, err := sender(event)
if err == nil {
logging.FromContext(ctx).Infof("Event '%s' sent correctly, uuid: %s", newEv.Name, newEv.UID)
return true
reference, err := ref.GetReference(scheme.Scheme, on)
if err != nil {
logging.FromContext(ctx).Fatalf("Could not construct reference to: '%#v' due to: '%v'", on, err)
}

// If we can't contact the server, then hold everything while we keep trying.
// Otherwise, something about the event is malformed and we should abandon it.
switch err.(type) {
case *restclient.RequestConstructionError:
// We will construct the request the same next time, so don't keep trying.
logging.FromContext(ctx).Errorf("Unable to construct event '%s': '%v' (will not retry!)", event.Name, err)
return true
case *errors.StatusError:
if errors.IsAlreadyExists(err) {
logging.FromContext(ctx).Infof("Server rejected event '%s': '%v' (will not retry!)", event.Name, err)
} else {
logging.FromContext(ctx).Errorf("Server rejected event '%s': '%v' (will not retry!)", event.Name, err)
}
return true
case *errors.UnexpectedObjectError:
// We don't expect this; it implies the server's response didn't match a
// known pattern. Go ahead and retry.
default:
// This case includes actual http transport errors. Go ahead and retry.
return &recorder{
ctx: ctx,
namespace: podNamespace,
agentName: agentName,
ref: reference,
}
logging.FromContext(ctx).Errorf("Unable to write event: '%v' (may retry after sleeping)", err)
return false
}
Loading

0 comments on commit ba45a11

Please sign in to comment.