Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Collect logs from all containers and add log to test properties. #306

Merged
merged 6 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion tools/cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func main() {
var p time.Duration
var retries uint
var deleteSuccessfulTests bool
var logURLPrefix string

flag.Var(&i, "i", "input files containing load test configurations")
flag.StringVar(&o, "o", "", "name of the output file for xunit xml report")
Expand All @@ -44,6 +45,7 @@ func main() {
flag.DurationVar(&p, "polling-interval", 20*time.Second, "polling interval for load test status")
flag.UintVar(&retries, "polling-retries", 2, "Maximum retries in case of communication failure")
flag.BoolVar(&deleteSuccessfulTests, "delete-successful-tests", false, "Delete tests immediately in case of successful termination")
flag.StringVar(&logURLPrefix, "log-url-prefix", "", "prefix for log urls")
flag.Parse()

inputConfigs, err := runner.DecodeFromFiles(i)
Expand Down Expand Up @@ -75,8 +77,11 @@ func main() {
log.Printf("Test counts per queue: %v", runner.CountConfigs(configQueueMap))
log.Printf("Queue concurrency levels: %v", c)
log.Printf("Output directories: %v", outputDirMap)
if logURLPrefix != "" {
log.Printf("Prefix for url for saved logs: %s", logURLPrefix)
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
}

r := runner.NewRunner(runner.NewLoadTestGetter(), runner.AfterIntervalFunction(p), retries, deleteSuccessfulTests, runner.NewLogSaver(runner.NewPodsGetter()))
r := runner.NewRunner(runner.NewLoadTestGetter(), runner.AfterIntervalFunction(p), retries, deleteSuccessfulTests, runner.NewPodsGetter(), logURLPrefix)

logPrefixFmt := runner.LogPrefixFmt(configQueueMap)

Expand Down
17 changes: 17 additions & 0 deletions tools/runner/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package runner

import (
"context"
"errors"
"log"
"os"
"strings"
Expand All @@ -36,6 +38,7 @@ import (

grpcv1 "github.com/grpc/test-infra/api/v1"
clientset "github.com/grpc/test-infra/clientset"
"github.com/grpc/test-infra/status"
)

// NewLoadTestGetter returns a client to interact with LoadTest resources. The
Expand Down Expand Up @@ -85,6 +88,20 @@ func NewPodsGetter() corev1types.PodsGetter {
return clientset.CoreV1()
}

// GetTestPods retrieves the pods associated with a LoadTest.
func GetTestPods(ctx context.Context, loadTest *grpcv1.LoadTest, podsGetter corev1types.PodsGetter) ([]*corev1.Pod, error) {
podLister := podsGetter.Pods(metav1.NamespaceAll)
// Get a list of all pods
podList, err := podLister.List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.New("Failed to fetch list of pods")
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
}

// Get pods just for this specific test
testPods := status.PodsForLoadTest(loadTest, podList.Items)
return testPods, nil
}

// getKubernetesConfig retrieves the kubernetes configuration.
func getKubernetesConfig() *rest.Config {
config, err := rest.InClusterConfig()
Expand Down
141 changes: 51 additions & 90 deletions tools/runner/logsaver.go
Original file line number Diff line number Diff line change
@@ -1,98 +1,88 @@
/*
Copyright 2021 gRPC authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runner

import (
"bytes"
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"

grpcv1 "github.com/grpc/test-infra/api/v1"
"github.com/grpc/test-infra/status"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
corev1types "k8s.io/client-go/kubernetes/typed/core/v1"
)

// LogSaver provides functionality to save pod logs to files.
type LogSaver struct {
podsGetter corev1types.PodsGetter
}

// NewLogSaver creates a new LogSaver object.
func NewLogSaver(podsGetter corev1types.PodsGetter) *LogSaver {
return &LogSaver{
podsGetter: podsGetter,
}
}

// SavePodLogs saves pod logs to files with same name as pod.
// This function returns a map where pods are keys and values are the filepath
// of the saved log.
func (ls *LogSaver) SavePodLogs(ctx context.Context, loadTest *grpcv1.LoadTest, podLogDir string) (*SavedLogs, error) {
savedLogs := NewSavedLogs()

// Get pods for this test
pods, err := ls.getTestPods(ctx, loadTest)
if err != nil {
return savedLogs, err
}
// SaveLogs saves logs to files, the name of the files are in format
// pod-name-container-name.log.
// This function returns a list of pointers of LogInfo.
func SaveLogs(ctx context.Context, loadTest *grpcv1.LoadTest, pods []*corev1.Pod, podsGetter corev1types.PodsGetter, podLogDir string) ([]*LogInfo, error) {
logInfos := []*LogInfo{}
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved

// Attempt to create directory. Will not error if directory already exists
err = os.MkdirAll(podLogDir, os.ModePerm)
err := os.MkdirAll(podLogDir, os.ModePerm)
if err != nil {
return savedLogs, fmt.Errorf("Failed to create pod log output directory %s: %v", podLogDir, err)
return logInfos, fmt.Errorf("Failed to create pod log output directory %s: %v", podLogDir, err)
}

// Write logs to files
for _, pod := range pods {
logFilePath := filepath.Join(podLogDir, pod.Name+".log")
buffer, err := ls.getPodLogBuffer(ctx, pod)
if err != nil {
return savedLogs, fmt.Errorf("could not get log from pod: %s", err)
}
err = ls.writeBufferToFile(buffer, logFilePath)
if err != nil {
return savedLogs, fmt.Errorf("could not write pod log buffer to file: %s", err)
}
savedLogs.podToPathMap[pod] = logFilePath
}
return savedLogs, nil
}
for _, container := range pod.Spec.Containers {
logBuffer, err := GetLogBuffer(ctx, pod, podsGetter, container.Name)

// getTestPods retrieves the pods associated with a LoadTest.
func (ls *LogSaver) getTestPods(ctx context.Context, loadTest *grpcv1.LoadTest) ([]*corev1.Pod, error) {
podLister := ls.podsGetter.Pods(metav1.NamespaceAll)
if err != nil {
return logInfos, fmt.Errorf("could not get log from pod: %s", err)
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
}

// Get a list of all pods
podList, err := podLister.List(ctx, metav1.ListOptions{})
if err != nil {
return nil, errors.New("Failed to fetch list of pods")
}
if logBuffer.Len() == 0 {
continue
}

logFilePath := filepath.Join(podLogDir, fmt.Sprintf("%s-%s.log", pod.Name, container.Name))
logInfo := NewLogInfo(PodNameElement(pod.Name, loadTest.Name), container.Name, logFilePath)

err = writeBufferToFile(logBuffer, logFilePath)
if err != nil {
return logInfos, fmt.Errorf("could not write %s container in %s pod log buffer to file: %s", logInfo.containerName, pod.Name, err)
}

// Get pods just for this specific test
testPods := status.PodsForLoadTest(loadTest, podList.Items)
return testPods, nil
logInfos = append(logInfos, logInfo)
}
}
return logInfos, nil
}

func (ls *LogSaver) getPodLogBuffer(ctx context.Context, pod *corev1.Pod) (*bytes.Buffer, error) {
req := ls.podsGetter.Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{})
driverLogs, err := req.Stream(ctx)
// GetLogBuffer retrieves logs from a specific container
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
// in the given pod and return the log buffer.
func GetLogBuffer(ctx context.Context, pod *corev1.Pod, podsGetter corev1types.PodsGetter, containerName string) (*bytes.Buffer, error) {
req := podsGetter.Pods(pod.Namespace).GetLogs(pod.Name, &corev1.PodLogOptions{Container: containerName})
containerLogs, err := req.Stream(ctx)
if err != nil {
return nil, err
}
defer driverLogs.Close()

defer containerLogs.Close()
logBuffer := new(bytes.Buffer)
logBuffer.ReadFrom(driverLogs)

logBuffer.ReadFrom(containerLogs)
return logBuffer, nil
}

func (ls *LogSaver) writeBufferToFile(buffer *bytes.Buffer, filePath string) error {
func writeBufferToFile(buffer *bytes.Buffer, filePath string) error {
// Don't write empty buffers
if buffer.Len() == 0 {
return nil
Expand All @@ -114,32 +104,3 @@ func (ls *LogSaver) writeBufferToFile(buffer *bytes.Buffer, filePath string) err

return nil
}

// SavedLogs adds functions to get information about saved pod logs.
type SavedLogs struct {
podToPathMap map[*corev1.Pod]string
}

// NewSavedLogs creates a new SavedLogs object.
func NewSavedLogs() *SavedLogs {
return &SavedLogs{
podToPathMap: make(map[*corev1.Pod]string),
}
}

// GenerateProperties creates pod-log related properties.
func (sl *SavedLogs) GenerateProperties(loadTest *grpcv1.LoadTest) map[string]string {
properties := make(map[string]string)
for pod := range sl.podToPathMap {
name := sl.podToPropertyName(pod.Name, loadTest.Name, "name")
properties[name] = pod.Name
}
return properties
}

func (sl *SavedLogs) podToPropertyName(podName, loadTestName, elementName string) string {
prefix := fmt.Sprintf("%s-", loadTestName)
podNameSuffix := strings.TrimPrefix(podName, prefix)
propertyName := fmt.Sprintf("pod.%s.%s", podNameSuffix, elementName)
return propertyName
}
74 changes: 74 additions & 0 deletions tools/runner/properties.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
Copyright 2022 gRPC authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package runner

import (
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
)

// LogInfo contains infomation for a log entry.
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
type LogInfo struct {
podNameElement string
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
containerName string
logPath string
}

// NewLogInfo creates a pointer of new LogInfo object.
func NewLogInfo(podNameElement string, containerName string, logPath string) *LogInfo {
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
return &LogInfo{
podNameElement: podNameElement,
containerName: containerName,
logPath: logPath,
}
}

// PodLogProperties creates container log property name to
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
// container log link map.
func PodLogProperties(logInfos []*LogInfo, logURLPrefix string) map[string]string {
properties := make(map[string]string)
for _, logInfo := range logInfos {
prefix := []string{logInfo.podNameElement, "name", "log", logInfo.containerName}
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
podLogPropertyKey := strings.Join(prefix, ".")
url := logURLPrefix + logInfo.logPath
properties[podLogPropertyKey] = url
}
return properties
}

// PodNameElement trim off the loadtest name from the pod name,
// and return only the element of pod name such as client-0,
// driver-0 and server-0.
func PodNameElement(podName, loadTestName string) string {
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
prefix := fmt.Sprintf("%s-", loadTestName)
podNameElement := strings.TrimPrefix(podName, prefix)
return podNameElement
}

// PodNameProperties creates pod property name to pod name map.
func PodNameProperties(pods []*corev1.Pod, loadTestName string) map[string]string {
properties := make(map[string]string)
for _, pod := range pods {
prefix := []string{PodNameElement(pod.Name, loadTestName), "name"}
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
podNamePropertyKey := strings.Join(prefix, ".")
properties[podNamePropertyKey] = pod.Name
}

return properties
}
25 changes: 19 additions & 6 deletions tools/runner/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (

grpcv1 "github.com/grpc/test-infra/api/v1"
clientset "github.com/grpc/test-infra/clientset"
corev1types "k8s.io/client-go/kubernetes/typed/core/v1"
)

// AfterIntervalFunction returns a function that stops for a time interval.
Expand All @@ -50,18 +51,21 @@ type Runner struct {
// deleteSuccessfulTests determines whether tests that terminate without
// errors should be deleted immediately.
deleteSuccessfulTests bool
// logSaver saves pod log files
logSaver *LogSaver
// podsLister obtain a list of pods
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
podsGetter corev1types.PodsGetter
// logURLPrefix is used to calculate the link to the log saved in placer
logURLPrefix string
}

// NewRunner creates a new Runner object.
func NewRunner(loadTestGetter clientset.LoadTestGetter, afterInterval func(), retries uint, deleteSuccessfulTests bool, logSaver *LogSaver) *Runner {
func NewRunner(loadTestGetter clientset.LoadTestGetter, afterInterval func(), retries uint, deleteSuccessfulTests bool, podsGetter corev1types.PodsGetter, logURLPrefix string) *Runner {
return &Runner{
loadTestGetter: loadTestGetter,
afterInterval: afterInterval,
retries: retries,
deleteSuccessfulTests: deleteSuccessfulTests,
logSaver: logSaver,
podsGetter: podsGetter,
logURLPrefix: logURLPrefix,
}
}

Expand Down Expand Up @@ -141,14 +145,23 @@ func (r *Runner) runTest(ctx context.Context, config *grpcv1.LoadTest, reporter
status = statusString(config)
switch {
case loadTest.Status.State.IsTerminated():
savedLogs, err := r.logSaver.SavePodLogs(ctx, loadTest, outputDir)
pods, err := GetTestPods(ctx, loadTest, r.podsGetter)
if err != nil {
reporter.Error("Could not list all pods belongs to %s: %s", loadTest.Name, err)
}
savedLogInfos, err := SaveLogs(ctx, loadTest, pods, r.podsGetter, outputDir)
wanlin31 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
reporter.Error("Could not save pod logs: %s", err)
}
reporter.AddProperty("name", loadTest.Name)
for property, value := range savedLogs.GenerateProperties(loadTest) {
for property, value := range PodNameProperties(pods, loadTest.Name) {
reporter.AddProperty(property, value)
}

for property, value := range PodLogProperties(savedLogInfos, r.logURLPrefix) {
reporter.AddProperty(property, value)
}

if status != "Succeeded" {
reporter.Error("Test failed with reason %q: %v", loadTest.Status.Reason, loadTest.Status.Message)
} else {
Expand Down