diff --git a/Makefile b/Makefile index d7f1b0a438b..4d9e41fa723 100644 --- a/Makefile +++ b/Makefile @@ -10,6 +10,7 @@ TEST= IMAGE= # Guess location of openshift/release repo. NOTE: override this if it is not correct. +# TIP: https://stackoverflow.com/a/29434812/844449 OPENSHIFT=${CURDIR}/../../github.com/openshift/release install: diff --git a/openshift/ci-operator/knative-test-images/wathola-fetcher/Dockerfile b/openshift/ci-operator/knative-test-images/wathola-fetcher/Dockerfile new file mode 100644 index 00000000000..460de3344e0 --- /dev/null +++ b/openshift/ci-operator/knative-test-images/wathola-fetcher/Dockerfile @@ -0,0 +1,5 @@ +# Do not edit! This file was generated via Makefile +FROM openshift/origin-base + +ADD wathola-fetcher /usr/bin/wathola-fetcher +ENTRYPOINT ["/usr/bin/wathola-fetcher"] diff --git a/test/lib/nodes/address.go b/test/lib/nodes/address.go deleted file mode 100644 index 6512ac9e36b..00000000000 --- a/test/lib/nodes/address.go +++ /dev/null @@ -1,68 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TODO(ksuszyns): remove the whole package after knative/pkg#1001 is merged - -package nodes - -import ( - "sort" - - corev1 "k8s.io/api/core/v1" -) - -// GuessNodeExternalAddress tries to guess external address of a node -func (n *NodesClient) GuessNodeExternalAddress(node *corev1.Node) *corev1.NodeAddress { - sorted := make([]*corev1.NodeAddress, len(node.Status.Addresses)) - for i := 0; i < len(node.Status.Addresses); i++ { - sorted[i] = &node.Status.Addresses[i] - } - sort.Sort(byAddressType{sorted}) - first := sorted[0] - priority := addressTypePriority[first.Type] - if priority >= 2 { - n.logger.Warnf("Chosen address is probably an internal type: %s, "+ - "and might be unaccessible", first.Type) - } - return first -} - -type nodeAddresses []*corev1.NodeAddress - -type byAddressType struct { - nodeAddresses -} - -func (s byAddressType) Len() int { - return len(s.nodeAddresses) -} - -func (s byAddressType) Swap(i, j int) { - s.nodeAddresses[i], s.nodeAddresses[j] = s.nodeAddresses[j], s.nodeAddresses[i] -} - -func (s byAddressType) Less(i, j int) bool { - return addressTypePriority[s.nodeAddresses[i].Type] < - addressTypePriority[s.nodeAddresses[j].Type] -} - -var addressTypePriority = map[corev1.NodeAddressType]int{ - corev1.NodeExternalDNS: 0, - corev1.NodeExternalIP: 1, - corev1.NodeInternalIP: 2, - corev1.NodeInternalDNS: 3, - corev1.NodeHostName: 4, -} diff --git a/test/lib/nodes/address_test.go b/test/lib/nodes/address_test.go deleted file mode 100644 index 1718c59fa6f..00000000000 --- a/test/lib/nodes/address_test.go +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TODO(ksuszyns): remove the whole package after knative/pkg#1001 is merged - -package nodes - -import ( - "testing" - - corev1 "k8s.io/api/core/v1" - kubernetesfake "k8s.io/client-go/kubernetes/fake" -) - -func TestNodesClientGuessNodeExternalAddress(t *testing.T) { - clientset := kubernetesfake.NewSimpleClientset() - c := Client(clientset, newLogger()) - node := &corev1.Node{ - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{{ - Type: corev1.NodeInternalIP, - Address: "10.0.2.17", - }, { - Type: corev1.NodeHostName, - Address: "node-17", - }, { - Type: corev1.NodeInternalDNS, - Address: "node-17.europe3.internal", - }, { - Type: corev1.NodeExternalIP, - Address: "35.123.11.234", - }}, - }, - } - - address := c.GuessNodeExternalAddress(node) - - if address.Address != "35.123.11.234" { - t.Errorf("Address: %s want: 35.123.11.234", address.Address) - } -} - -func TestNodesClientGuessNodeExternalAddress_PrivateOnly(t *testing.T) { - clientset := kubernetesfake.NewSimpleClientset() - c := Client(clientset, newLogger()) - node := &corev1.Node{ - Status: corev1.NodeStatus{ - Addresses: []corev1.NodeAddress{{ - Type: corev1.NodeInternalIP, - Address: "10.0.2.17", - }, { - Type: corev1.NodeHostName, - Address: "node-17", - }, { - Type: corev1.NodeInternalDNS, - Address: "node-17.europe3.internal", - }}, - }, - } - - address := c.GuessNodeExternalAddress(node) - - if address.Address != "10.0.2.17" { - t.Errorf("Address: %s want: 10.0.2.17", address.Address) - } -} diff --git a/test/lib/nodes/kind.go b/test/lib/nodes/kind.go deleted file mode 100644 index 7cbe5a34527..00000000000 --- a/test/lib/nodes/kind.go +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TODO(ksuszyns): remove the whole package after knative/pkg#1001 is merged - -package nodes - -import ( - corev1 "k8s.io/api/core/v1" -) - -const ( - roleLabelFormat = "node-role.kubernetes.io/" -) - -// FilterOutByRole returns a new slice without nodes that have given role -func FilterOutByRole(nodes []corev1.Node, role string) []corev1.Node { - result := make([]corev1.Node, 0, len(nodes)) - for _, node := range nodes { - if !hasRole(role, node) { - result = append(result, node) - } - } - return result -} - -func hasRole(role string, node corev1.Node) bool { - if node.Labels == nil { - return false - } - label := roleLabelFormat + role - _, has := node.Labels[label] - return has -} diff --git a/test/lib/nodes/logger_test.go b/test/lib/nodes/logger_test.go deleted file mode 100644 index 18928392fdf..00000000000 --- a/test/lib/nodes/logger_test.go +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TODO(ksuszyns): remove the whole package after knative/pkg#1001 is merged - -package nodes - -import ( - "go.uber.org/zap" - "go.uber.org/zap/zapcore" -) - -func newLogger() *zap.SugaredLogger { - z, err := zap.NewDevelopment(zap.AddStacktrace(zapcore.ErrorLevel)) - if err != nil { - panic(err) - } - return z.Sugar() -} diff --git a/test/lib/nodes/nodes.go b/test/lib/nodes/nodes.go deleted file mode 100644 index a4259eca7b2..00000000000 --- a/test/lib/nodes/nodes.go +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TODO(ksuszyns): remove the whole package after knative/pkg#1001 is merged - -package nodes - -import ( - "errors" - "math/rand" - - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" -) - -// NodesClient contains interface for making requests to kubernetes client. -type NodesClient struct { - kube kubernetes.Interface - logger *zap.SugaredLogger -} - -// Client creates a new nodes client -func Client(kube kubernetes.Interface, logger *zap.SugaredLogger) *NodesClient { - return &NodesClient{ - kube: kube, - logger: logger, - } -} - -// RandomWorkerNode gets a worker node randomly -func (n *NodesClient) RandomWorkerNode() (*corev1.Node, error) { - nodes, err := n.kube.CoreV1().Nodes().List(metav1.ListOptions{}) - if err != nil { - return nil, err - } - nodesCount := len(nodes.Items) - if nodesCount == 0 { - return nil, errors.New("fetched 0 nodes, so can't chose a worker") - } - if nodesCount == 1 { - node := nodes.Items[0] - n.logger.Infof("Only one node found (named: %s), returning it as"+ - " it must be a worker (Minikube, CRC)", node.Name) - return &node, nil - } else { - role := "master" - n.logger.Infof("Filtering %d nodes, to not contain role: %s", nodesCount, role) - workers := FilterOutByRole(nodes.Items, role) - n.logger.Infof("Found %d worker(s)", len(workers)) - worker := workers[rand.Intn(len(workers))] - n.logger.Infof("Chosen node: %s", worker.Name) - return &worker, nil - } -} diff --git a/test/lib/nodes/nodes_test.go b/test/lib/nodes/nodes_test.go deleted file mode 100644 index 2d4c49417a1..00000000000 --- a/test/lib/nodes/nodes_test.go +++ /dev/null @@ -1,99 +0,0 @@ -/* - * Copyright 2020 The Knative Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -// TODO(ksuszyns): remove the whole package after knative/pkg#1001 is merged - -package nodes - -import ( - "testing" - - corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - kubernetesfake "k8s.io/client-go/kubernetes/fake" -) - -func TestNodesClientRandomWorkerNode(t *testing.T) { - masterNode := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "master-1", - Labels: map[string]string{ - roleLabelFormat + "master": "", - }, - }, - } - workerNode := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: "worker-1", - }, - } - clientset := kubernetesfake.NewSimpleClientset(masterNode, workerNode) - c := Client(clientset, newLogger()) - - node, err := c.RandomWorkerNode() - - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - nodeName := workerNode.Name - assertProperNodeIsReturned(t, node, nodeName) -} - -func TestNodesClientRandomWorkerNode_OneNode(t *testing.T) { - nodeName := "minikube" - minikube := &corev1.Node{ - ObjectMeta: metav1.ObjectMeta{ - Name: nodeName, - Labels: map[string]string{ - roleLabelFormat + "master": "", - }, - }, - } - clientset := kubernetesfake.NewSimpleClientset(minikube) - c := Client(clientset, newLogger()) - - node, err := c.RandomWorkerNode() - - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - assertProperNodeIsReturned(t, node, nodeName) -} - -func TestNodesClientRandomWorkerNode_NoNode(t *testing.T) { - clientset := kubernetesfake.NewSimpleClientset() - c := Client(clientset, newLogger()) - - _, err := c.RandomWorkerNode() - - if err == nil { - t.Fatal("Expected error, but not received") - } - actual := err.Error() - if actual != "fetched 0 nodes, so can't chose a worker" { - t.Errorf("Unexpected error: %s", actual) - } -} - -func assertProperNodeIsReturned(t *testing.T, node *corev1.Node, nodeName string) { - t.Helper() - if node == nil { - t.Fatal("Expect not to be nil") - } - if node.Name != nodeName { - t.Errorf("Got = %v, want: %s", node, nodeName) - } -} diff --git a/test/test_images/wathola-fetcher/main.go b/test/test_images/wathola-fetcher/main.go new file mode 100644 index 00000000000..5e02a2bcb02 --- /dev/null +++ b/test/test_images/wathola-fetcher/main.go @@ -0,0 +1,25 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "knative.dev/eventing/test/upgrade/prober/wathola/fetcher" +) + +func main() { + fetcher.New().FetchReport() +} diff --git a/test/test_images/wathola-fetcher/main_test.go b/test/test_images/wathola-fetcher/main_test.go new file mode 100644 index 00000000000..b23f4fd8034 --- /dev/null +++ b/test/test_images/wathola-fetcher/main_test.go @@ -0,0 +1,77 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/http/httptest" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "knative.dev/eventing/test/upgrade/prober/wathola/config" + "knative.dev/eventing/test/upgrade/prober/wathola/fetcher" + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" +) + +func TestFetcherMain(t *testing.T) { + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + st := receiver.Report{ + State: "active", + Events: 1456, + Thrown: []string{}, + } + bytes, err := json.Marshal(st) + if err != nil { + t.Fatal(err) + } + _, err = w.Write(bytes) + if err != nil { + t.Fatal(err) + } + })) + oldTarget := config.Instance.Forwarder.Target + config.Instance.Forwarder.Target = ts.URL + defer ts.Close() + defer func() { + config.Instance.Forwarder.Target = oldTarget + }() + + stdout := func() []byte { + rescueStdout := os.Stdout + r, w, _ := os.Pipe() + os.Stdout = w + + main() + + _ = w.Close() + out, _ := ioutil.ReadAll(r) + os.Stdout = rescueStdout + return out + }() + + exec := fetcher.Execution{} + err := json.Unmarshal(stdout, &exec) + if err != nil { + t.Fatal(err) + } + + assert.Equal(t, 1456, exec.Report.Events) + assert.Equal(t, "active", exec.Report.State) +} diff --git a/test/test_images/wathola-fetcher/pod.yaml b/test/test_images/wathola-fetcher/pod.yaml new file mode 100644 index 00000000000..d6377098a31 --- /dev/null +++ b/test/test_images/wathola-fetcher/pod.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Pod +metadata: + name: wathola-fetcher +spec: + containers: + - name: wathola-fetcher + image: ko://knative.dev/eventing/test/test_images/wathola-fetcher diff --git a/test/upgrade/README.md b/test/upgrade/README.md index e39a9566157..db3d8f27219 100644 --- a/test/upgrade/README.md +++ b/test/upgrade/README.md @@ -54,34 +54,39 @@ we run a prober test that continually sends events to a service during the entire upgrade/downgrade process. When the upgrade completes, we make sure that all of those events propagated just once. -To achieve that a tool was prepared (https://github.com/cardil/wathola). It -consists of 3 components: _sender_, _forwarder_, and _receiver_. _Sender_ is the -usual Kubernetes pod that publishes events to the default `broker` with given -interval. When it closes (by either `SIGTERM`, or `SIGINT`), an `finished` event -is generated. _Forwarder_ is a knative serving service that scales from zero to -receive the requested traffic. _Forwarders_ receive events and forwards them to -given target. _Receiver_ is an ordinary pod that collects events from multiple -forwarders and has an endpoint `/report` that can be polled to get the status of -sent events. +To achieve that a [wathola tool](test/upgrade/prober/wathola) was prepared. It +consists of 4 components: _sender_, _forwarder_, _receiver_, and _fetcher_. +_Sender_ is the usual Kubernetes deployment that publishes events to the default +`broker` with given interval. When it closes (by either `SIGTERM`, or `SIGINT`), +an `finished` event is generated. _Forwarder_ is a knative serving service that +scales from zero to receive the requested traffic. _Forwarders_ receive events +and forwards them to given target. _Receiver_ is an ordinary deployment that +collects events from multiple forwarders and has an endpoint `/report` that can +be polled to get the status of sent events. To fetch the report from within the +cluster _fetcher_ comes in. It's a simple one time job, that will fetch report +from _receiver_ and print it on stdout as JSON. That enable test client to +download _fetcher_ logs and parse JSON to get the report. Diagram below describe the setup: ``` - (pod) (ksvc) (pod) -+--------+ +-----------+ +----------+ -| | | ++ | | -| Sender | +-->| Forwarder ||----->+ Receiver | -| | | | || | | -+---+----+ | +------------| +----------+ - | | +-----------+ - | | - | | - | +--+-----+ +---------+ - +-----> | | +-+ - | Broker | < - - | Trigger | | - | | | | | - +--------+ +---------+ | - (default) +----------+ + K8s cluster | Test machine + | + (deploym.) (ksvc) (deploym.) | ++--------+ +-----------+ +----------+ | +------------+ +| | | ++ | | | | | +| Sender | +-->| Forwarder ||----->+ Receiver | | + TestProber | +| | | | || | |<---+ | | | ++---+----+ | +------------| +----------+ | | +------------+ + | | +-----------+ | | + | | | | + | | +---------+ | + | +--+-----+ +---------+ | | | + +-----> | | +-+ + Fetcher | | + | Broker | < - > | Trigger | | | | | + | | | | | +---------+ | + +--------+ +---------+ | (job) | + (default) +----------+ | ``` #### Probe test configuration diff --git a/test/upgrade/prober/prober.go b/test/upgrade/prober/prober.go index 0a11372ab10..cb6eba0b5df 100644 --- a/test/upgrade/prober/prober.go +++ b/test/upgrade/prober/prober.go @@ -34,7 +34,7 @@ var ( // Prober is the interface for a prober, which checks the result of the probes when stopped. type Prober interface { // Verify will verify prober state after finished has been send - Verify() ([]error, int) + Verify(ctx context.Context) ([]error, int) // Finish send finished event Finish() @@ -61,7 +61,7 @@ func AssertEventProber(t *testing.T, prober Prober) { waitAfterFinished(prober) - errors, events := prober.Verify() + errors, events := prober.Verify(ctx) if len(errors) == 0 { t.Logf("All %d events propagated well", events) } else { diff --git a/test/upgrade/prober/verify.go b/test/upgrade/prober/verify.go index 7db5f69a434..d5f785bcb16 100644 --- a/test/upgrade/prober/verify.go +++ b/test/upgrade/prober/verify.go @@ -16,26 +16,29 @@ package prober import ( - "bytes" + "context" "encoding/json" "errors" "fmt" - "net/http" - "strings" + "time" "github.com/wavesoftware/go-ensure" "go.uber.org/zap" + batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" - "knative.dev/eventing/test/lib/nodes" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/kubernetes" + "knative.dev/eventing/test/upgrade/prober/wathola/fetcher" + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" + pkgTest "knative.dev/pkg/test" + "knative.dev/pkg/test/logging" ) +const fetcherName = "wathola-fetcher" + func (p *prober) Verify() ([]error, int) { - nc := nodes.Client(p.client.Kube.Kube, p.log) - node, err := nc.RandomWorkerNode() - ensure.NoError(err) - address := nc.GuessNodeExternalAddress(node) - p.log.Debugf("Address resolved: %v, type: %v", address.Address, address.Type) - report := fetchReceiverReport(address, p.log) + report := p.fetchReport() p.log.Infof("Fetched receiver report. Events propagated: %v. "+ "State: %v", report.Events, report.State) if report.State == "active" { @@ -52,26 +55,132 @@ func (p *prober) Finish() { p.removeSender() } -func fetchReceiverReport(address *corev1.NodeAddress, log *zap.SugaredLogger) *Report { - u := fmt.Sprintf("http://%s:%d/report", address.Address, receiverNodePort) - log.Infof("Fetching receiver report from: %v", u) - resp, err := http.Get(u) +func (p *prober) fetchReport() *receiver.Report { + exec := p.fetchExecution() + replayLogs(p.log, exec) + return exec.Report +} + +func replayLogs(log *zap.SugaredLogger, exec *fetcher.Execution) { + for _, entry := range exec.Logs { + logFunc := log.Error + switch entry.Level { + case "debug": + logFunc = log.Debug + case "info": + logFunc = log.Info + case "warning": + logFunc = log.Warn + } + logFunc("Fetcher: ", entry.Datetime, " ", entry.Message) + } +} + +func (p *prober) fetchExecution() *fetcher.Execution { + ns := p.config.Namespace + p.deployFetcher() + defer p.deleteFetcher() + pod := p.succeededJobPod(fetcherName, ns) + bytes, err := p.client.Kube.PodLogs(pod.Name, fetcherName, ns) + ensure.NoError(err) + ex := &fetcher.Execution{ + Logs: []fetcher.LogEntry{}, + Report: &receiver.Report{ + State: "failure", + Events: 0, + Thrown: []string{"Report wasn't fetched"}, + }, + } + err = json.Unmarshal(bytes, ex) ensure.NoError(err) - if resp.StatusCode != 200 { - var b strings.Builder - ensure.NoError(resp.Header.Write(&b)) - headers := b.String() - panic(fmt.Errorf("could not get receiver report at %v, "+ - "status code: %v, headers: %v", u, resp.StatusCode, headers)) + return ex +} + +func (p *prober) deployFetcher() { + p.log.Info("Deploying fetcher job: ", fetcherName) + jobs := p.client.Kube.Kube.BatchV1().Jobs(p.config.Namespace) + var replicas int32 = 1 + fetcherJob := &batchv1.Job{ + ObjectMeta: metav1.ObjectMeta{ + Name: fetcherName, + Namespace: p.client.Namespace, + }, + Spec: batchv1.JobSpec{ + Completions: &replicas, + Parallelism: &replicas, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": fetcherName, + }, + }, + Spec: corev1.PodSpec{ + RestartPolicy: corev1.RestartPolicyNever, + Volumes: []corev1.Volume{{ + Name: p.config.ConfigMapName, + VolumeSource: corev1.VolumeSource{ + ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{ + Name: p.config.ConfigMapName, + }, + }, + }, + }}, + Containers: []corev1.Container{{ + Name: fetcherName, + Image: pkgTest.ImagePath(fetcherName), + VolumeMounts: []corev1.VolumeMount{{ + Name: p.config.ConfigMapName, + ReadOnly: true, + MountPath: p.config.ConfigMountPoint, + }}, + }}, + }, + }, + }, } - buf := new(bytes.Buffer) - _, err = buf.ReadFrom(resp.Body) + _, err := jobs.Create(fetcherJob) + ensure.NoError(err) + p.log.Info("Waiting for fetcher job to succeed: ", fetcherName) + err = waitForJobIsCompleted(p.client.Kube.Kube, fetcherName, p.config.Namespace) ensure.NoError(err) - ensure.NoError(resp.Body.Close()) - jsonBytes := buf.Bytes() - var report Report - ensure.NoError(json.Unmarshal(jsonBytes, &report)) - return &report +} + +func (p *prober) deleteFetcher() { + ns := p.config.Namespace + jobs := p.client.Kube.Kube.BatchV1().Jobs(ns) + err := jobs.Delete(fetcherName, &metav1.DeleteOptions{}) + ensure.NoError(err) +} + +func (p *prober) succeededJobPod(jobName, namespace string) *corev1.Pod { + pods := p.client.Kube.Kube.CoreV1().Pods(namespace) + podList, err := pods.List(metav1.ListOptions{ + LabelSelector: fmt.Sprint("job-name=", jobName), + }) + ensure.NoError(err) + for _, pod := range podList.Items { + if pod.Status.Phase == corev1.PodSucceeded { + return &pod + } + } + return nil +} + +func waitForJobIsCompleted(client kubernetes.Interface, jobName, namespace string) error { + jobs := client.BatchV1().Jobs(namespace) + + span := logging.GetEmitableSpan(context.TODO(), fmt.Sprint("waitForJobIsCompleted/", jobName)) + defer span.End() + + return wait.PollImmediate(time.Second, 2*time.Minute, func() (bool, error) { + j, err := jobs.Get(jobName, metav1.GetOptions{}) + if err != nil { + return true, err + } + + return j.Status.Succeeded == *j.Spec.Completions, nil + }) } // Report represents a receiver JSON report diff --git a/test/upgrade/prober/wathola/fetcher/operations.go b/test/upgrade/prober/wathola/fetcher/operations.go new file mode 100644 index 00000000000..6bef6a309c6 --- /dev/null +++ b/test/upgrade/prober/wathola/fetcher/operations.go @@ -0,0 +1,125 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fetcher + +import ( + "encoding/json" + "io/ioutil" + "net/http" + "net/url" + "os" + + "github.com/wavesoftware/go-ensure" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "knative.dev/eventing/test/upgrade/prober/wathola/config" + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" +) + +// New will create a new fetcher +func New() Fetcher { + ml := &memoryLogger{ + Execution: &Execution{ + Logs: make([]LogEntry, 0), + Report: nil, + }, + } + config.Log = newLogger(ml) + config.ReadIfPresent() + return &fetcher{ + out: os.Stdout, + log: ml, + } +} + +// FetchReport will try to fetch report from remote receiver service and dump +// it on the output as JSON. +func (f *fetcher) FetchReport() { + err := f.fetchReport() + if err != nil { + config.Log.Error(err) + } + f.ensureStatePrinted() +} + +func (f *fetcher) fetchReport() error { + u, err := url.Parse(config.Instance.Forwarder.Target) + if err != nil { + return err + } + u.Path = "/report" + req, err := http.NewRequest("GET", u.String(), nil) + if err != nil { + return err + } + req.Header.Add("content-type", "application/json") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + body, err := ioutil.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return err + } + state := &receiver.Report{ + Thrown: []string{}, + } + err = json.Unmarshal(body, state) + if err != nil { + return err + } + f.log.Report = state + return nil +} + +func (f *fetcher) ensureStatePrinted() { + bytes, err := json.MarshalIndent(f.log.Execution, "", " ") + ensure.NoError(err) + _, err = f.out.Write(bytes) + ensure.NoError(err) + _, err = f.out.Write([]byte("\n")) + ensure.NoError(err) +} + +func (m *memoryLogger) Write(p []byte) (int, error) { + le := LogEntry{} + err := json.Unmarshal(p, &le) + if err != nil { + return 0, err + } + m.Logs = append(m.Logs, le) + return len(p), nil +} + +func (m *memoryLogger) Sync() error { + return nil +} + +func newLogger(ws zapcore.WriteSyncer) *zap.SugaredLogger { + encoderCfg := zapcore.EncoderConfig{ + MessageKey: "msg", + TimeKey: "dt", + LevelKey: "level", + NameKey: "logger", + EncodeLevel: zapcore.LowercaseLevelEncoder, + EncodeTime: zapcore.RFC3339NanoTimeEncoder, + EncodeDuration: zapcore.StringDurationEncoder, + } + core := zapcore.NewCore(zapcore.NewJSONEncoder(encoderCfg), ws, zap.DebugLevel) + return zap.New(core).Sugar() +} diff --git a/test/upgrade/prober/wathola/fetcher/types.go b/test/upgrade/prober/wathola/fetcher/types.go new file mode 100644 index 00000000000..5dc98682c49 --- /dev/null +++ b/test/upgrade/prober/wathola/fetcher/types.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fetcher + +import ( + "os" + "time" + + "knative.dev/eventing/test/upgrade/prober/wathola/receiver" +) + +// Fetcher will fetch a report from remote receiver service. +type Fetcher interface { + FetchReport() +} + +// Execution represents a response from remote receiver service and local logs +// that were received. +type Execution struct { + Logs []LogEntry `json:"logs"` + Report *receiver.Report `json:"report"` +} + +// LogEntry represents a single +type LogEntry struct { + Level string `json:"level"` + Datetime time.Time `json:"dt"` + Message string `json:"msg"` +} + +type fetcher struct { + out *os.File + log *memoryLogger +} + +type memoryLogger struct { + *Execution +} diff --git a/test/upgrade/prober/wathola/receiver/services.go b/test/upgrade/prober/wathola/receiver/services.go index 62be402d5eb..02a7ace8021 100644 --- a/test/upgrade/prober/wathola/receiver/services.go +++ b/test/upgrade/prober/wathola/receiver/services.go @@ -100,7 +100,7 @@ func (r reportHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { s := r.receiver.finished.State() errs := r.receiver.finished.Thrown() events := r.receiver.step.Count() - sj := &StateJSON{ + sj := &Report{ State: stateToString(s), Events: events, Thrown: errs, @@ -128,10 +128,3 @@ func stateToString(state event.State) string { panic(fmt.Sprintf("unknown state: %v", state)) } } - -// StateJSON represents state as JSON -type StateJSON struct { - State string `json:"state"` - Events int `json:"events"` - Thrown []string `json:"thrown"` -} diff --git a/test/upgrade/prober/wathola/receiver/types.go b/test/upgrade/prober/wathola/receiver/types.go new file mode 100644 index 00000000000..5980923c415 --- /dev/null +++ b/test/upgrade/prober/wathola/receiver/types.go @@ -0,0 +1,24 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package receiver + +// Report represents state as JSON +type Report struct { + State string `json:"state"` + Events int `json:"events"` + Thrown []string `json:"thrown"` +}