Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrtc-load-tester: Fix viewership metrics from tests #329

Merged
merged 21 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/webrtc-load-tester/gcloud/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func CreateJob(ctx context.Context, spec JobSpec) (job *runpb.Job, exec *runpb.E
"webrtc-load-tester": "true",
"load-test-id": spec.TestID,
}
glog.Infof("Creating job: %s", jobName)
glog.Infof("Creating job name=%s cpu=%d memory=%d tasks=%d", jobName, spec.CPUs, spec.MemoryMiB, spec.NumTasks)

parent := fmt.Sprintf("projects/%s/locations/%s", projectID, spec.Region)
createOp, err := jobsClient.CreateJob(ctx, &runpb.CreateJobRequest{
Expand Down
82 changes: 56 additions & 26 deletions cmd/webrtc-load-tester/roles/orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import (
"flag"
"fmt"
"math"
"net/url"
"os"
"path"
"strconv"
"time"

"github.com/golang/glog"
"github.com/google/uuid"
"github.com/livepeer/go-api-client"
"github.com/livepeer/stream-tester/cmd/webrtc-load-tester/gcloud"
"github.com/livepeer/stream-tester/cmd/webrtc-load-tester/utils"
Expand Down Expand Up @@ -39,12 +39,15 @@ type loadTestArguments struct {
InputFile string
}
Playback struct {
BaseURL string
ManifestURL string
RegionViewersJSON map[string]int
ViewersPerWorker int
MemoryPerViewerMiB int
DelayBetweenRegions time.Duration
BaseURL string
RegionViewersJSON map[string]int
ViewersPerWorker int
ViewersPerCPU float64
MemoryPerViewerMiB int
DelayBetweenRegions time.Duration
BaseScreenshotFolderOS *url.URL
ScreenshotPeriod time.Duration
ManifestURL string
}
}

Expand All @@ -59,7 +62,7 @@ func Orchestrator() {
fs.StringVar(&cliFlags.TestID, "test-id", "", "ID of previous test to recover. If not provided, a new test will be started with a random ID")
fs.StringVar(&cliFlags.GoogleCredentialsJSON, "google-credentials-json", "", "Google Cloud service account credentials JSON with access to Cloud Run")
fs.StringVar(&cliFlags.GoogleProjectID, "google-project-id", "livepeer-test", "Google Cloud project ID")
fs.StringVar(&cliFlags.ContainerImage, "container-image", "livepeer/webrtc-load-tester:vg-feat-webrtc-load-tester", "Container image to use for the worker jobs")
fs.StringVar(&cliFlags.ContainerImage, "container-image", "livepeer/webrtc-load-tester:master", "Container image to use for the worker jobs")

fs.DurationVar(&cliFlags.TestDuration, "duration", 10*time.Minute, "How long to run the test")

Expand All @@ -70,9 +73,12 @@ func Orchestrator() {
fs.StringVar(&cliFlags.Playback.BaseURL, "playback-base-url", "https://monster.lvpr.tv/", "Base URL for the player page")
fs.StringVar(&cliFlags.Playback.ManifestURL, "playback-manifest-url", "", "URL for playback")
utils.JSONVarFlag(fs, &cliFlags.Playback.RegionViewersJSON, "playback-region-viewers-json", `{"us-central1":100,"europe-west2":100}`, "JSON object of Google Cloud regions to the number of viewers that should be simulated there. Notice that the values must be multiples of playback-viewers-per-worker, and up to 1000 x that")
fs.IntVar(&cliFlags.Playback.ViewersPerWorker, "playback-viewers-per-worker", 50, "Number of viewers to simulate per worker")
fs.IntVar(&cliFlags.Playback.MemoryPerViewerMiB, "playback-memory-per-viewer-mib", 100, "Amount of memory to allocate per viewer (browser tab)")
fs.IntVar(&cliFlags.Playback.ViewersPerWorker, "playback-viewers-per-worker", 10, "Number of viewers to simulate per worker")
fs.Float64Var(&cliFlags.Playback.ViewersPerCPU, "playback-viewers-per-cpu", 2, "Number of viewers to allocate per CPU on player jobs")
fs.IntVar(&cliFlags.Playback.MemoryPerViewerMiB, "playback-memory-per-viewer-mib", 300, "Amount of memory to allocate per viewer (browser tab)")
fs.DurationVar(&cliFlags.Playback.DelayBetweenRegions, "playback-delay-between-regions", 1*time.Minute, "How long to wait between starting jobs on different regions")
utils.URLVarFlag(fs, &cliFlags.Playback.BaseScreenshotFolderOS, "playback-base-screenshot-folder-os", "", "Object Store URL for a folder where to save screenshots of the player. If unset, no screenshots will be taken")
fs.DurationVar(&cliFlags.Playback.ScreenshotPeriod, "playback-screenshot-period", 1*time.Minute, "How often to take a screenshot of the player")

fs.StringVar(&cliFlags.APIToken, "api-token", "", "Token of the Livepeer API to be used")
fs.StringVar(&cliFlags.APIServer, "api-server", "livepeer.monster", "Server of the Livepeer API to be used")
Expand Down Expand Up @@ -128,18 +134,20 @@ func initClients(cliFlags loadTestArguments) {
}

func runLoadTest(ctx context.Context, args loadTestArguments) error {
args.TestID = uuid.New().String()
glog.Infof("Starting new test with ID %s", args.TestID)
wait(ctx, 5*time.Second)

stream, err := studioApi.CreateStream(api.CreateStreamReq{
Name: "webrtc-load-test-" + args.TestID,
Name: "webrtc-load-test-" + time.Now().UTC().Format(time.RFC3339),
})
if err != nil {
return fmt.Errorf("failed to create stream: %w", err)
}

glog.Infof("Stream created: %s", stream.ID)

// Use the stream ID as the test ID for simplicity. Helps on recovering a running test as well.
args.TestID = stream.ID
glog.Infof("Starting new test with ID %s", args.TestID)
wait(ctx, 5*time.Second)

glog.Infof("Access the stream at: https://%s", path.Join(args.APIServer, "/dashboard/streams", stream.ID))

_, streamer, err := gcloud.CreateJob(ctx, streamerJobSpec(args, stream.StreamKey))
Expand Down Expand Up @@ -198,7 +206,7 @@ func recoverLoadTest(ctx context.Context, args loadTestArguments) error {
}
}

waitTestFinished(ctx, "", executions)
waitTestFinished(ctx, args.TestID, executions)

return nil
}
Expand Down Expand Up @@ -276,25 +284,47 @@ func playerJobSpec(args loadTestArguments, region string, viewers int, playbackI
playbackURL = fmt.Sprintf(args.Playback.ManifestURL, playbackID)
}

jobArgs := []string{
"-base-url", args.Playback.BaseURL,
"-playback-id", playbackID,
"-playback-url", playbackURL,
"-simultaneous", strconv.Itoa(simultaneous),
"-duration", args.TestDuration.String(),
}
if args.Playback.BaseScreenshotFolderOS != nil {
jobArgs = append(jobArgs,
"-screenshot-folder-os", args.Playback.BaseScreenshotFolderOS.JoinPath(args.TestID, region).String(),
"-screenshot-period", args.Playback.ScreenshotPeriod.String(),
)
}

return gcloud.JobSpec{
Region: region,

ContainerImage: args.ContainerImage,
Role: "player",
Args: []string{
"-base-url", args.Playback.BaseURL,
"-playback-id", playbackID,
"-playback-url", playbackURL,
"-simultaneous", strconv.Itoa(simultaneous),
"-duration", args.TestDuration.String(),
},
Timeout: timeout,
Args: jobArgs,
Timeout: timeout,

TestID: args.TestID,
NumTasks: numTasks,
CPUs: int(math.Ceil(float64(simultaneous) / 50)), // 50 viewers per CPU
MemoryMiB: int(math.Ceil(float64(simultaneous*args.Playback.MemoryPerViewerMiB)/512) * 512), // Round up to 512MB increments
CPUs: allowedCPUValue(float64(simultaneous) / args.Playback.ViewersPerCPU), // Defaults to 1 CPU every 2 viewers
MemoryMiB: int(math.Ceil(float64(simultaneous*args.Playback.MemoryPerViewerMiB)/128) * 128), // Round up to 128MB increments
}
}

// allowedCPUValue returns the first allowed value for the CPU equal or higher than the requested value. The allowed
// values for the CPU are [1 2 4 6 8]. Cloud Run supports values <1 but we don't.
func allowedCPUValue(viewersPerCPU float64) int {
allowedValues := []int{1, 2, 4, 6, 8}
cpuInt := int(math.Ceil(viewersPerCPU))

for _, v := range allowedValues {
if v >= cpuInt {
return v
}
}
return allowedValues[len(allowedValues)-1]
}

func wait(ctx context.Context, dur time.Duration) {
Expand Down
106 changes: 98 additions & 8 deletions cmd/webrtc-load-tester/roles/player.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,33 @@
package roles

import (
"bytes"
"context"
"flag"
"fmt"
"log"
"net/url"
"os"
"strings"
"time"

"github.com/chromedp/chromedp"
"github.com/golang/glog"
"github.com/livepeer/go-tools/drivers"
"github.com/livepeer/stream-tester/cmd/webrtc-load-tester/utils"
)

const uploadScreenshotTimeout = 10 * time.Second

type playerArguments struct {
BaseURL string
PlaybackID, PlaybackURL string // only one will be used, playbackURL takes precedence
Simultaenous uint
Simultaneous uint
PlayerStartInterval time.Duration
TestDuration time.Duration

ScreenshotFolderOS *url.URL
ScreenshotPeriod time.Duration
}

func Player() {
Expand All @@ -27,8 +37,11 @@ func Player() {
fs.StringVar(&cliFlags.BaseURL, "base-url", "https://lvpr.tv/", "Base URL for the player")
fs.StringVar(&cliFlags.PlaybackID, "playback-id", "", "Playback ID to use for the player")
fs.StringVar(&cliFlags.PlaybackURL, "playback-url", "", "Playback URL to use for the player. Will override any playback-id value")
fs.UintVar(&cliFlags.Simultaenous, "simultaneous", 1, "How many players to run simultaneously")
fs.UintVar(&cliFlags.Simultaneous, "simultaneous", 1, "How many players to run simultaneously")
fs.DurationVar(&cliFlags.PlayerStartInterval, "player-start-interval", 2*time.Second, "How often to wait between starting the simultaneous players")
fs.DurationVar(&cliFlags.TestDuration, "duration", 1*time.Minute, "How long to run the test")
utils.URLVarFlag(fs, &cliFlags.ScreenshotFolderOS, "screenshot-folder-os", "", "Object Store URL for a folder where to save screenshots of the player. If unset, no screenshots will be taken")
fs.DurationVar(&cliFlags.ScreenshotPeriod, "screenshot-period", 1*time.Minute, "How often to take a screenshot of the player")
})

if cliFlags.PlaybackID == "" && cliFlags.PlaybackURL == "" {
Expand Down Expand Up @@ -57,22 +70,28 @@ func runPlayerTest(args playerArguments) {
os.Exit(1)
}

errs := make(chan error, args.Simultaenous)
for i := uint(0); i < args.Simultaenous; i++ {
errs := make(chan error, args.Simultaneous)
for i := uint(0); i < args.Simultaneous; i++ {
i := i // avoid go's loop variable capture

if args.PlayerStartInterval > 0 {
time.Sleep(args.PlayerStartInterval)
}

go func() {
errs <- runSinglePlayerTest(ctx, args)
errs <- runSinglePlayerTest(ctx, args, i)
}()
}

for i := uint(0); i < args.Simultaenous; i++ {
for i := uint(0); i < args.Simultaneous; i++ {
err := <-errs
if err != nil {
glog.Errorf("Routine finished with error: %v\n", err)
}
}
}

func runSinglePlayerTest(ctx context.Context, args playerArguments) error {
func runSinglePlayerTest(ctx context.Context, args playerArguments, idx uint) error {
url, err := buildPlayerUrl(args.BaseURL, args.PlaybackID, args.PlaybackURL)
if err != nil {
return err
Expand All @@ -83,11 +102,67 @@ func runSinglePlayerTest(ctx context.Context, args playerArguments) error {

tasks := chromedp.Tasks{
chromedp.Navigate(url),
chromedp.Sleep(args.TestDuration),
}

if args.ScreenshotFolderOS == nil {
tasks = append(tasks, chromedp.Sleep(args.TestDuration))
} else {
osFolder := args.ScreenshotFolderOS.
JoinPath(getHostname(), fmt.Sprintf("player-%d", idx)).
String()

driver, err := drivers.ParseOSURL(osFolder, true)
if err != nil {
return err
}
storage := driver.NewSession("")

// grab an initial screenshot
tasks = append(tasks, uploadScreenshot(storage, screenshotName(0, 0)))

numPics := int(args.TestDuration / args.ScreenshotPeriod)
for picIdx := 1; picIdx <= numPics; picIdx++ {
tasks = append(tasks, chromedp.Sleep(args.ScreenshotPeriod))

screenshotTime := time.Duration(picIdx) * args.ScreenshotPeriod
name := screenshotName(picIdx, screenshotTime)
tasks = append(tasks, uploadScreenshot(storage, name))
}

if remaining := args.TestDuration % args.ScreenshotPeriod; remaining != 0 {
tasks = append(tasks, chromedp.Sleep(remaining))
name := screenshotName(numPics+1, args.TestDuration)
tasks = append(tasks, uploadScreenshot(storage, name))
}
}

return chromedp.Run(ctx, tasks)
}

func uploadScreenshot(storage drivers.OSSession, name string) chromedp.ActionFunc {
return chromedp.ActionFunc(func(ctx context.Context) error {
var picBuf []byte
screenshotAction := chromedp.FullScreenshot(&picBuf, 50)

if err := screenshotAction.Do(ctx); err != nil {
return err
}

go func() {
_, err := storage.SaveData(ctx, name, bytes.NewBuffer(picBuf), nil, uploadScreenshotTimeout)
if err != nil {
glog.Errorf("Error uploading screenshot: %v\n", err)
}
}()
return nil
})
}

func screenshotName(idx int, d time.Duration) string {
timeStr := time.Time{}.Add(d).Format("15h04m05s")
return fmt.Sprintf("screenshot-%03d-%s.jpg", idx, timeStr)
}

func buildPlayerUrl(baseURL, playbackID, playbackURL string) (string, error) {
url, err := url.Parse(baseURL)
if err != nil {
Expand All @@ -106,3 +181,18 @@ func buildPlayerUrl(baseURL, playbackID, playbackURL string) (string, error) {

return url.String(), nil
}

func getHostname() string {
execution := os.Getenv("CLOUD_RUN_EXECUTION")
taskIndex := os.Getenv("CLOUD_RUN_TASK_INDEX")

if execution != "" && taskIndex != "" {
// extract only the final hash on the execution name
parts := strings.Split(execution, "-")
hash := parts[len(parts)-1]
return fmt.Sprintf("exec-%s-task-%s", hash, taskIndex)
}

hostname, _ := os.Hostname()
return hostname
}
9 changes: 8 additions & 1 deletion cmd/webrtc-load-tester/roles/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,17 @@ func runStreamerTest(ctx context.Context, args streamerArguments) error {
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr

if err := cmd.Run(); err != nil {
if err := cmd.Start(); err != nil {
return fmt.Errorf("failed to start ffmpeg: %w", err)
}

if err := cmd.Wait(); err != nil {
if err.Error() == "signal: killed" {
return nil
}
return fmt.Errorf("ffmpeg exited with error: %w", err)
}

return nil
}

Expand Down
Loading
Loading