diff --git a/.credo.exs b/.credo.exs index f9cc8d32..85a588e9 100644 --- a/.credo.exs +++ b/.credo.exs @@ -176,7 +176,7 @@ {Credo.Check.Refactor.DoubleBooleanNegation, false}, {Credo.Check.Refactor.ModuleDependencies, false}, {Credo.Check.Refactor.NegatedIsNil, false}, - {Credo.Check.Refactor.PipeChainStart, false}, + {Credo.Check.Refactor.PipeChainStart, []}, {Credo.Check.Refactor.VariableRebinding, false}, {Credo.Check.Warning.LeakyEnvironment, false}, {Credo.Check.Warning.MapGetUnsafePass, false}, diff --git a/config/runtime.exs b/config/runtime.exs index 00178356..16fe659c 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -88,6 +88,12 @@ case ConfigReader.read_ssl_config() do config :fishjam, FishjamWeb.Endpoint, http: [ip: ip, port: port] end +config :fishjam, + feature_flags: [ + custom_room_name_disabled: + ConfigReader.read_boolean("FJ_FEATURE_FLAG_CUSTOM_ROOM_NAME_DISABLED") || false + ] + check_origin = ConfigReader.read_check_origin("FJ_CHECK_ORIGIN") if check_origin != nil do diff --git a/lib/fishjam/application.ex b/lib/fishjam/application.ex index 4560ac4c..966f755b 100644 --- a/lib/fishjam/application.ex +++ b/lib/fishjam/application.ex @@ -117,6 +117,7 @@ defmodule Fishjam.Application do defp ensure_epmd_started!() do try do {_output, 0} = System.cmd("epmd", ["-daemon"]) + # credo:disable-for-next-line :ok = Task.async(&ensure_epmd_running/0) |> Task.await(@epmd_timeout) :ok diff --git a/lib/fishjam/component/file.ex b/lib/fishjam/component/file.ex index 70aa6f04..e16f3123 100644 --- a/lib/fishjam/component/file.ex +++ b/lib/fishjam/component/file.ex @@ -61,7 +61,8 @@ defmodule Fishjam.Component.File do defp validate_file_path(file_path) do base_path = - Application.fetch_env!(:fishjam, :media_files_path) + :fishjam + |> Application.fetch_env!(:media_files_path) |> Path.join(@files_location) |> Path.expand() diff --git a/lib/fishjam/component/hls/recording.ex b/lib/fishjam/component/hls/recording.ex index b2e05e45..3e55189c 100644 --- a/lib/fishjam/component/hls/recording.ex +++ b/lib/fishjam/component/hls/recording.ex @@ -45,7 +45,8 @@ defmodule Fishjam.Component.HLS.Recording do end defp root_directory() do - Application.fetch_env!(:fishjam, :media_files_path) + :fishjam + |> Application.fetch_env!(:media_files_path) |> Path.join(@recordings_folder) |> Path.expand() end @@ -58,7 +59,10 @@ defmodule Fishjam.Component.HLS.Recording do end defp do_delete(id) do - directory(id) |> File.rm_rf!() + id + |> directory() + |> File.rm_rf!() + :ok end end diff --git a/lib/fishjam/component/hls/request_handler.ex b/lib/fishjam/component/hls/request_handler.ex index e8bbd303..796b2659 100644 --- a/lib/fishjam/component/hls/request_handler.ex +++ b/lib/fishjam/component/hls/request_handler.ex @@ -64,7 +64,11 @@ defmodule Fishjam.Component.HLS.RequestHandler do def handle_recording_request(recording_id, filename) do with :ok <- Recording.validate_recording(recording_id) do recording_path = Recording.directory(recording_id) - file_path = Path.join(recording_path, filename) |> Path.expand() + + file_path = + recording_path + |> Path.join(filename) + |> Path.expand() if PathValidation.inside_directory?(file_path, recording_path), do: File.read(file_path), diff --git a/lib/fishjam/feature_flags.ex b/lib/fishjam/feature_flags.ex new file mode 100644 index 00000000..4d11286c --- /dev/null +++ b/lib/fishjam/feature_flags.ex @@ -0,0 +1,15 @@ +defmodule Fishjam.FeatureFlags do + @moduledoc """ + Module to resolve any feature flags, since we are not using database we can't use fun_with_flags. + Because of that we base feature flags on the environment variables mainly. + """ + + @doc """ + Flag for disabling custom room names, which will be replaced by the generated based on the node name. + + Introduced: 28/05/2024 + Removal: Once we move on to generated room_ids permanently. + """ + def custom_room_name_disabled?, + do: Application.get_env(:fishjam, :feature_flags)[:custom_room_name_disabled] +end diff --git a/lib/fishjam/room/config.ex b/lib/fishjam/room/config.ex index 1f2cf028..5cf1fb4b 100644 --- a/lib/fishjam/room/config.ex +++ b/lib/fishjam/room/config.ex @@ -2,6 +2,9 @@ defmodule Fishjam.Room.Config do @moduledoc """ Room configuration """ + + alias Fishjam.Room.ID + @enforce_keys [ :room_id, :max_peers, @@ -37,7 +40,7 @@ defmodule Fishjam.Room.Config do peerless_purge_timeout = Map.get(params, "peerlessPurgeTimeout") peer_disconnected_timeout = Map.get(params, "peerDisconnectedTimeout") - with {:ok, room_id} <- parse_room_id(room_id), + with {:ok, room_id} <- ID.generate(room_id), :ok <- validate_max_peers(max_peers), {:ok, video_codec} <- codec_to_atom(video_codec), :ok <- validate_webhook_url(webhook_url), @@ -55,18 +58,6 @@ defmodule Fishjam.Room.Config do end end - defp parse_room_id(nil), do: {:ok, UUID.uuid4()} - - defp parse_room_id(room_id) when is_binary(room_id) do - if Regex.match?(~r/^[a-zA-Z0-9-_]+$/, room_id) do - {:ok, room_id} - else - {:error, :invalid_room_id} - end - end - - defp parse_room_id(_room_id), do: {:error, :invalid_room_id} - defp validate_max_peers(nil), do: :ok defp validate_max_peers(max_peers) when is_integer(max_peers) and max_peers >= 0, do: :ok defp validate_max_peers(_max_peers), do: {:error, :invalid_max_peers} diff --git a/lib/fishjam/room/id.ex b/lib/fishjam/room/id.ex new file mode 100644 index 00000000..5c16f521 --- /dev/null +++ b/lib/fishjam/room/id.ex @@ -0,0 +1,91 @@ +defmodule Fishjam.Room.ID do + @moduledoc """ + This module allows to generate room_id with the node name in it. + """ + + @type id :: String.t() + + @doc """ + Based on the Room ID determines to which node it belongs to. + Returns an error if the node isn't present in the cluster. + + DISCLAIMER: + It should be used only with room_ids generated by generate/0, otherwise it can raise. + """ + @spec determine_node(id()) :: + {:ok, node()} | {:error, :invalid_room_id} | {:error, :invalid_node} + def determine_node(room_id) do + with {:ok, room_id} <- validate_room_id(room_id), + node_name <- decode_node_name(room_id), + true <- node_present_in_cluster?(node_name) do + {:ok, node_name} + else + {:error, :invalid_room_id} -> {:error, :invalid_room_id} + false -> {:error, :invalid_node} + end + end + + @doc """ + Room ID structure resembles the one of the UUID, although the last part is replaced by encoded node name. + + ## Example: + For node_name: "fishjam@10.0.0.1" + + iex> Fishjam.Room.ID.generate() + "da2e-4a75-95ff-776bad2caf04-666973686a616d4031302e302e302e31" + """ + @spec generate() :: id() + def generate do + UUID.uuid4() + |> String.split("-") + |> Enum.take(-4) + |> Enum.concat([encoded_node_name()]) + |> Enum.join("-") + end + + @doc """ + Depending on feature flag "custom_room_name_disabled" + - uses `generate/0` to generate room_id + or + - parses the `room_id` provided by the client + """ + @spec generate(nil | String.t()) :: {:ok, id()} | {:error, :invalid_room_id} + def generate(nil), do: generate(UUID.uuid4()) + + def generate(room_id) do + if Fishjam.FeatureFlags.custom_room_name_disabled?() do + {:ok, generate()} + else + validate_room_id(room_id) + end + end + + defp decode_node_name(room_id) do + room_id + |> String.split("-") + |> Enum.take(-1) + |> Enum.at(0) + |> Base.decode16!(case: :lower) + |> String.to_existing_atom() + end + + defp encoded_node_name do + Node.self() + |> Atom.to_string() + |> Base.encode16(case: :lower) + end + + defp node_present_in_cluster?(node) do + node in [Node.self() | Node.list()] + end + + defp validate_room_id(room_id) when is_binary(room_id) do + if Regex.match?(~r/^[a-zA-Z0-9-_]+$/, room_id) do + {:ok, room_id} + else + {:error, :invalid_room_id} + end + end + + defp validate_room_id(_room_id), do: {:error, :invalid_room_id} +end diff --git a/lib/fishjam/room_service.ex b/lib/fishjam/room_service.ex index 81f117de..b1ad68b0 100644 --- a/lib/fishjam/room_service.ex +++ b/lib/fishjam/room_service.ex @@ -7,7 +7,9 @@ defmodule Fishjam.RoomService do require Logger - alias Fishjam.{Event, Room, WebhookNotifier} + alias Fishjam.Event + alias Fishjam.Room + alias Fishjam.WebhookNotifier @metric_interval_in_seconds Application.compile_env!(:fishjam, :room_metrics_scrape_interval) @metric_interval_in_milliseconds @metric_interval_in_seconds * 1_000 @@ -58,31 +60,14 @@ defmodule Fishjam.RoomService do @spec create_room(Room.Config.t()) :: {:ok, Room.t(), String.t()} | {:error, atom()} def create_room(config) do - {node_resources, failed_nodes} = :rpc.multicall(Fishjam.RoomService, :get_resource_usage, []) - - if Enum.count(failed_nodes) > 0 do - Logger.warning( - "Couldn't get resource usage of the following nodes. Reason: nodes don't exist. Nodes: #{inspect(failed_nodes)}" - ) - end - - {failed_rpcs, node_resources} = - Enum.split_with(node_resources, fn - {:badrpc, _info} -> true - _other -> false - end) - - unless Enum.empty?(failed_rpcs) do - Logger.warning("These RPC calls fail: #{inspect(failed_rpcs)}") - end - - min_node = find_best_node(node_resources) - - if Enum.count(node_resources) > 1 do - Logger.info("Node with least used resources is #{inspect(min_node)}") - GenServer.call({__MODULE__, min_node}, {:create_room, config}) - else - GenServer.call(__MODULE__, {:create_room, config}) + case Fishjam.RPCClient.multicall(Fishjam.RoomService, :get_resource_usage, []) do + [_only_self_resources] -> + GenServer.call(__MODULE__, {:create_room, config}) + + nodes_resources -> + min_node = find_best_node(nodes_resources) + Logger.info("Node with least used resources is #{inspect(min_node)}") + GenServer.call({__MODULE__, min_node}, {:create_room, config}) end end diff --git a/lib/fishjam/rpc_client.ex b/lib/fishjam/rpc_client.ex new file mode 100644 index 00000000..047973de --- /dev/null +++ b/lib/fishjam/rpc_client.ex @@ -0,0 +1,54 @@ +defmodule Fishjam.RPCClient do + @moduledoc """ + This modules serves as simple RPC client to communicate with other nodes in cluster. + It utilizes the Enhanced version of Erlang `rpc` called `erpc`. + + Enhanced version allows to distinguish between returned value, raised exceptions, and other errors. + `erpc` also has better performance and scalability than the original rpc implementation. + """ + require Logger + + @doc """ + Executes mfa on a remote node. + Function returns {:ok, result} tuple only if the execution succeeded. + In case of any exceptions we are catching them logging and returning simple :error atom. + """ + @spec call(node(), module(), atom(), term(), timeout()) :: {:ok, term()} | :error + def call(node, module, function, args, timeout \\ :infinity) do + try do + {:ok, :erpc.call(node, module, function, args, timeout)} + rescue + e -> + Logger.warning("RPC call to node #{node} failed with exception: #{inspect(e)}") + :error + end + end + + @doc """ + Multicall to all nodes in the cluster, including this node. + It filters out any errors or exceptions from return so you may end up with empty list. + """ + @spec multicall(module(), atom(), term(), timeout()) :: list(term) + def multicall(module, function, args, timeout \\ :infinity) do + nodes() + |> :erpc.multicall(module, function, args, timeout) + |> handle_result() + end + + defp handle_result(result) when is_list(result) do + result + |> Enum.reduce([], fn + {:ok, res}, acc -> + [res | acc] + + {status, res}, acc -> + Logger.warning( + "RPC multicall to one of the nodes failed with status: #{inspect(status)} because of: #{inspect(res)}" + ) + + acc + end) + end + + defp nodes, do: [Node.self() | Node.list()] +end diff --git a/test/fishjam/room/id_test.exs b/test/fishjam/room/id_test.exs new file mode 100644 index 00000000..10fa92c5 --- /dev/null +++ b/test/fishjam/room/id_test.exs @@ -0,0 +1,56 @@ +defmodule Fishjam.Room.IDTest do + use ExUnit.Case + + alias Fishjam.Room.ID, as: Subject + + describe "determine_node/1" do + test "resolves node name from the provided room_id" do + node_name = Node.self() + room_id = Subject.generate() + + assert {:ok, node_name} == Subject.determine_node(room_id) + end + + test "returns error if node is not detected in cluster" do + invalid_node = :invalid_node |> Atom.to_string() |> Base.encode16(case: :lower) + invalid_room_id = "room-id-#{invalid_node}" + assert {:error, :invalid_node} == Subject.determine_node(invalid_room_id) + end + end + + describe "generate/0" do + test "room_id last part is based on the node name" do + room1_id = Subject.generate() + room2_id = Subject.generate() + + node_part_from_room1 = room1_id |> String.split("-") |> Enum.take(-1) + node_part_from_room2 = room2_id |> String.split("-") |> Enum.take(-1) + + assert node_part_from_room1 == node_part_from_room2 + end + + test "generated room_id has 5 parts" do + room_id = Subject.generate() + assert room_id |> String.split("-") |> length() == 5 + end + end + + describe "generate/1" do + setup do + Application.delete_env(:fishjam, :feature_flags) + end + + test "executes generate/0 when feature flag is enabled and generates random id" do + Application.put_env(:fishjam, :feature_flags, custom_room_name_disabled: true) + refute {:ok, "custom_room_name"} == Subject.generate("custom_room_name") + end + + test "parses custom room name when feature flag is disabled" do + assert {:ok, "custom_room_name"} == Subject.generate("custom_room_name") + end + + test "returns error when custom room doesn't meet naming criteria" do + assert {:error, :invalid_room_id} = Subject.generate("invalid_characters//??$@!") + end + end +end diff --git a/test/fishjam_web/controllers/component/file_component_test.exs b/test/fishjam_web/controllers/component/file_component_test.exs index 2639c72f..f3080031 100644 --- a/test/fishjam_web/controllers/component/file_component_test.exs +++ b/test/fishjam_web/controllers/component/file_component_test.exs @@ -24,7 +24,8 @@ defmodule FishjamWeb.Component.FileComponentTest do Application.put_env(:fishjam, :components_used, [Fishjam.Component.File]) media_sources_directory = - Application.fetch_env!(:fishjam, :media_files_path) + :fishjam + |> Application.fetch_env!(:media_files_path) |> Path.join(@file_component_directory) |> Path.expand() diff --git a/test/fishjam_web/controllers/component/hls_component_test.exs b/test/fishjam_web/controllers/component/hls_component_test.exs index 58bd3628..032564a1 100644 --- a/test/fishjam_web/controllers/component/hls_component_test.exs +++ b/test/fishjam_web/controllers/component/hls_component_test.exs @@ -154,7 +154,7 @@ defmodule FishjamWeb.Component.HlsComponentTest do end test "renders component with ll-hls enabled", %{conn: conn, room_id: room_id} do - assert Registry.lookup(Fishjam.RequestHandlerRegistry, room_id) |> Enum.empty?() + assert Fishjam.RequestHandlerRegistry |> Registry.lookup(room_id) |> Enum.empty?() conn = post(conn, ~p"/room/#{room_id}/component", type: "hls", options: %{lowLatency: true}) @@ -183,7 +183,7 @@ defmodule FishjamWeb.Component.HlsComponentTest do assert_receive {:DOWN, _ref, :process, ^engine_pid, :normal}, 10_000 assert_receive {:DOWN, _ref, :process, ^request_handler, :normal} - assert Registry.lookup(Fishjam.RequestHandlerRegistry, room_id) |> Enum.empty?() + assert Fishjam.RequestHandlerRegistry |> Registry.lookup(room_id) |> Enum.empty?() end test "renders errors when video codec is different than h264 - vp8", %{conn: conn} do diff --git a/test/fishjam_web/integration/server_notification_test.exs b/test/fishjam_web/integration/server_notification_test.exs index 6308b5e5..6f5505ad 100644 --- a/test/fishjam_web/integration/server_notification_test.exs +++ b/test/fishjam_web/integration/server_notification_test.exs @@ -469,7 +469,8 @@ defmodule FishjamWeb.Integration.ServerNotificationTest do test "sends message when File adds or removes tracks", %{conn: conn} do media_sources_directory = - Application.fetch_env!(:fishjam, :media_files_path) + :fishjam + |> Application.fetch_env!(:media_files_path) |> Path.join(@file_component_directory) |> Path.expand()