Skip to content

Commit

Permalink
LL-HLS stress test (#2)
Browse files Browse the repository at this point in the history
* Initial LL-HLS test commit
* Review fixes
* Connection manager initial commit (#3)
* update paths
* Review fixes pt2
* add moduledoc
* update gitignore
* add readme stub
* fix typo
* Review fixes pt3
* Backoff if track manifest request fails
* add missing send
  • Loading branch information
sgfn authored Oct 5, 2023
1 parent 28cfd5e commit 5f491e4
Show file tree
Hide file tree
Showing 12 changed files with 613 additions and 9 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -177,3 +177,6 @@ $RECYCLE.BIN/
*.lnk

# End of https://www.gitignore.io/api/c,vim,linux,macos,elixir,windows,visualstudiocode

# Test results default filename
results.csv
8 changes: 5 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
# Jellygrinder

TODO
Utility for running HLS and LL-HLS stress-tests against [the Jellyfish Media Server](https://github.com/jellyfish-dev/jellyfish)

## Installation

TODO
Make sure to have installed [Elixir](https://elixir-lang.org/install.html) first.

Run `mix deps.get`.

## Usage

TODO
Run `mix help lltest` for usage information.

## Copyright and License

Expand Down
3 changes: 0 additions & 3 deletions lib/jellygrinder.ex

This file was deleted.

16 changes: 16 additions & 0 deletions lib/jellygrinder/application.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
defmodule Jellygrinder.Application do
@moduledoc false

use Application
alias Jellygrinder.{ClientSupervisor, Coordinator}

@impl true
def start(_mode, _opts) do
children = [
ClientSupervisor,
Coordinator
]

Supervisor.start_link(children, strategy: :one_for_one)
end
end
26 changes: 26 additions & 0 deletions lib/jellygrinder/client_supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule Jellygrinder.ClientSupervisor do
@moduledoc false

use DynamicSupervisor
alias Jellygrinder.LLClient

@spec start_link(term()) :: Supervisor.on_start()
def start_link(arg) do
DynamicSupervisor.start_link(__MODULE__, arg, name: __MODULE__)
end

@spec spawn_client(module(), term()) :: DynamicSupervisor.on_start_child()
def spawn_client(client_module \\ LLClient, arg) do
DynamicSupervisor.start_child(__MODULE__, {client_module, arg})
end

@spec terminate() :: :ok
def terminate() do
DynamicSupervisor.stop(__MODULE__)
end

@impl true
def init(_arg) do
DynamicSupervisor.init(strategy: :one_for_one)
end
end
151 changes: 151 additions & 0 deletions lib/jellygrinder/coordinator.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
defmodule Jellygrinder.Coordinator do
@moduledoc false

use GenServer, restart: :temporary

require Logger

alias Jellygrinder.ClientSupervisor
alias Jellygrinder.Coordinator.Config

@spec run_test(Config.t()) :: :ok | no_return()
def run_test(config) do
ref = Process.monitor(__MODULE__)
GenServer.call(__MODULE__, {:run_test, config})

receive do
{:DOWN, ^ref, :process, _pid, reason} ->
if reason != :normal,
do: Logger.error("Coordinator process exited with reason #{inspect(reason)}")

:ok
end
end

@spec start_link(term()) :: GenServer.on_start()
def start_link(_args) do
GenServer.start_link(__MODULE__, nil, name: __MODULE__)
end

@impl true
def init(_args) do
Logger.info("Coordinator: Init")

{:ok, nil}
end

@impl true
def handle_call({:run_test, config}, _from, _state) do
config = Config.fill_hls_url!(config)

Logger.info("""
Coordinator: Start of test
URL: #{config.url}
Clients: #{config.clients}
Time: #{config.time} s
Save results to: #{config.out_path}
""")

Process.send_after(self(), :end_test, config.time * 1000)
send(self(), :spawn_client)

state = %{
uri: URI.parse(config.url),
clients: config.clients,
time: config.time,
spawn_interval: config.spawn_interval,
out_path: config.out_path,
client_count: 0,
results: []
}

{:reply, :ok, state}
end

@impl true
def handle_cast({:result, r}, %{results: results} = state) do
r = amend_result(r, state)

unless r.success do
Logger.warning(
"Coordinator: Request failed (from: #{r.process_name}, label: #{r.label}, code: #{r.response_code})"
)
end

{:noreply, %{state | results: [r | results]}}
end

@impl true
def handle_info(:spawn_client, %{client_count: max_clients, clients: max_clients} = state) do
{:noreply, state}
end

@impl true
def handle_info(:spawn_client, %{client_count: client_count} = state) do
Process.send_after(self(), :spawn_client, state.spawn_interval)
name = "client-#{client_count}"

case ClientSupervisor.spawn_client(%{uri: state.uri, name: name}) do
{:ok, pid} ->
Logger.info("Coordinator: #{name} spawned at #{inspect(pid)}")
_ref = Process.monitor(pid)

{:noreply, %{state | client_count: client_count + 1}}

{:error, reason} ->
Logger.error("Coordinator: Error spawning #{name}: #{inspect(reason)}")

{:noreply, state}
end
end

@impl true
def handle_info(:end_test, %{results: results, out_path: out_path} = state) do
Logger.info("Coordinator: End of test")

ClientSupervisor.terminate()

Logger.info("Coordinator: Generating report...")

results =
results
|> Enum.reverse()
|> Enum.map_join("", &serialize_result/1)

Logger.info("Coordinator: Saving generated report to #{out_path}...")
File.write!(out_path, results_header() <> results)
Logger.info("Coordinator: Report saved successfully. Exiting")

{:stop, :normal, state}
end

@impl true
def handle_info({:DOWN, _ref, :process, pid, reason}, %{client_count: client_count} = state) do
Logger.warning("Coordinator: Child process #{inspect(pid)} died: #{inspect(reason)}")

{:noreply, %{state | client_count: client_count - 1}}
end

@impl true
def handle_info(msg, state) do
Logger.warning("Coordinator: Received unexpected message: #{inspect(msg)}")

{:noreply, state}
end

defp amend_result(result, %{client_count: client_count, uri: uri} = _state) do
request_url = uri |> Map.put(:path, result.path) |> URI.to_string()

result
|> Map.put(:client_count, client_count)
|> Map.put(:url, request_url)
end

defp results_header() do
"timeStamp,elapsed,label,responseCode,responseMessage,threadName,dataType,success,failureMessage,bytes,sentBytes,grpThreads,allThreads,URL,Latency,IdleTime,Connect\n"
end

defp serialize_result(r) do
"#{r.timestamp},#{r.elapsed},#{r.label},#{r.response_code},,#{r.process_name},,#{r.success},#{r.failure_msg},#{r.bytes},-1,#{r.client_count},#{r.client_count},#{r.url},-1,-1,-1\n"
end
end
49 changes: 49 additions & 0 deletions lib/jellygrinder/coordinator/config.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
defmodule Jellygrinder.Coordinator.Config do
@moduledoc false

@default_client_config [
server_address: "localhost:5002",
server_api_token: "development",
secure?: false
]

@type t :: %__MODULE__{
client_config: Jellyfish.Client.connection_options(),
url: String.t() | nil,
clients: pos_integer(),
time: pos_integer(),
spawn_interval: pos_integer(),
out_path: Path.t()
}

defstruct client_config: @default_client_config,
url: nil,
clients: 500,
time: 300,
spawn_interval: 200,
out_path: "results.csv"

@spec fill_hls_url!(t()) :: t() | no_return()
def fill_hls_url!(%{url: nil} = config) do
client_config = Keyword.merge(@default_client_config, config.client_config)
client = Jellyfish.Client.new(client_config)

case Jellyfish.Room.get_all(client) do
{:ok, [room | _rest]} ->
protocol = if client_config[:secure?], do: "https", else: "http"

%{
config
| url: "#{protocol}://#{client_config[:server_address]}/hls/#{room.id}/index.m3u8"
}

{:ok, []} ->
raise "No rooms present on Jellyfish"

{:error, reason} ->
raise "Error communicating with Jellyfish: #{inspect(reason)}"
end
end

def fill_hls_url!(config), do: config
end
Loading

0 comments on commit 5f491e4

Please sign in to comment.