Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

627 support for streaming via stdio #50

Merged
merged 26 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import Config

config :logger, backends: []
varsill marked this conversation as resolved.
Show resolved Hide resolved

if config_env() == :test do
config :membrane_file_plugin, :file_impl, Membrane.File.CommonMock
else
Expand Down
14 changes: 9 additions & 5 deletions examples/sink_and_source.exs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
Mix.install([
{:membrane_core, "~> 0.11"},
{:membrane_core, "~> 1.0"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

defmodule FileExamplePipeline do
@doc """
Example pipeline that reads its source code file and outputs it to /tmp/test.
"""

use Membrane.Pipeline

@doc false
Expand All @@ -14,18 +18,18 @@ defmodule FileExamplePipeline do
|> child(:file_sink, %Membrane.File.Sink{location: "/tmp/test"})
]
kidq330 marked this conversation as resolved.
Show resolved Hide resolved

{[spec: spec, playback: :playing], %{target: target}}
{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
def handle_element_end_of_stream(:file_sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end
end

{:ok, _supervisor_pid, pid} = FileExamplePipeline.start_link(self())
{:ok, _supervisor_pid, pid} = Membrane.Pipeline.start_link(FileExamplePipeline, self())

receive do
:done -> FileExamplePipeline.terminate(pid)
:done -> Membrane.Pipeline.terminate(pid)
end
23 changes: 17 additions & 6 deletions examples/sink_multi.exs
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
Mix.install([
{:membrane_core, "~> 0.11"},
{:membrane_core, "~> 1.0"},
{:membrane_file_plugin, path: Path.expand(__DIR__ <> "/..")}
])

# Filter responsible for generating split events
defmodule Splitter do
@moduledoc """
Receives buffer and splits it into two buffers
of size `head_size` and `buffer.size - head_size`,
sending a split event to multisink in between.
"""

use Membrane.Filter

alias Membrane.Buffer
Expand Down Expand Up @@ -40,6 +46,11 @@ end
:ok = File.write!("input.bin", <<0::integer-unit(8)-size(1024)>>)

defmodule SinkMultiExamplePipeline do
@moduledoc """
Example pipeline that reads a binary file
and performs a multisink split when sending it forward.
"""

use Membrane.Pipeline

@doc false
Expand All @@ -51,22 +62,22 @@ defmodule SinkMultiExamplePipeline do
|> child(:file_sink, %Membrane.File.Sink.Multi{location: "/tmp/output", extension: ".bin"})
]

{[spec: spec, playback: :playing], %{target: target}}
{[spec: spec], %{target: target}}
end

@impl true
def handle_element_end_of_stream({:file_sink, :input}, _ctx, state) do
def handle_element_end_of_stream(:file_sink, :input, _ctx, state) do
send(state.target, :done)
{[], state}
end

def handle_element_end_of_stream(_other, _ctx, state) do
def handle_element_end_of_stream(_elem, _pad, _ctx, state) do
{[], state}
end
end

{:ok, _supervisor_pid, pid} = SinkMultiExamplePipeline.start_link(self())
{:ok, _supervisor_pid, pid} = Membrane.Pipeline.start_link(SinkMultiExamplePipeline, self())

receive do
:done -> SinkMultiExamplePipeline.terminate(pid)
:done -> Membrane.Pipeline.terminate(pid)
end
38 changes: 36 additions & 2 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,20 @@ defmodule Membrane.File.Sink do
@common_file Membrane.File.CommonFileBehaviour.get_impl()

def_options location: [
spec: Path.t(),
description: "Path of the output file"
spec: Path.t() | :stdout,
description: "Path of the output file or :stdout"
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
]

def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any

@impl true
def handle_init(_ctx, %__MODULE__{location: :stdout}) do
{[],
%{
location: :stdout
}}
end

@impl true
def handle_init(_ctx, %__MODULE__{location: location}) do
{[],
Expand All @@ -32,6 +40,11 @@ defmodule Membrane.File.Sink do
}}
end

@impl true
def handle_setup(_ctx, %{location: :stdout} = state) do
{[], state}
end

@impl true
def handle_setup(_ctx, %{location: location} = state) do
fd = @common_file.open!(location, [:read, :write])
Expand All @@ -45,12 +58,23 @@ defmodule Membrane.File.Sink do
{[demand: :input], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, %{location: :stdout} = state) do
:ok = @common_file.write!(:stdio, buffer)
{[demand: :input], state}
end

@impl true
def handle_buffer(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{[demand: :input], state}
end

@impl true
def handle_event(:input, %SeekSinkEvent{}, _ctx, %{location: :stdout} = _state) do
raise "Seek event not supported for :stdout sink"
end

@impl true
def handle_event(:input, %SeekSinkEvent{insert?: insert?, position: position}, _ctx, state) do
state =
Expand All @@ -63,11 +87,21 @@ defmodule Membrane.File.Sink do

def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

@impl true
def handle_end_of_stream(:input, _ctx, %{location: :stdout} = state) do
{[], state}
end

@impl true
def handle_end_of_stream(:input, _ctx, state) do
{[], do_merge_and_close(state)}
end

@impl true
def handle_terminate_request(_ctx, %{location: :stdout} = state) do
{[terminate: :normal], state}
end

@impl true
def handle_terminate_request(_ctx, state) do
{[terminate: :normal], do_merge_and_close(state)}
Expand Down
56 changes: 54 additions & 2 deletions lib/membrane_file/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ defmodule Membrane.File.Source do
@common_file Membrane.File.CommonFileBehaviour.get_impl()

def_options location: [
spec: Path.t(),
description: "Path to the file"
spec: Path.t() | :stdin,
description: "Path to the file or :stdin"
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
],
chunk_size: [
spec: pos_integer(),
Expand All @@ -37,6 +37,22 @@ defmodule Membrane.File.Source do

def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual

@impl true
def handle_init(_ctx, %__MODULE__{location: :stdin, chunk_size: size, seekable?: seekable?}) do
if seekable? do
raise "Cannot seek when reading from :stdin"
else
{[],
%{
location: :stdin,
chunk_size: size,
should_send_eos: true,
size_to_read: :infinity,
seekable?: false
}}
end
end

@impl true
def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size, seekable?: seekable?}) do
size_to_read = if seekable?, do: 0, else: :infinity
Expand All @@ -52,6 +68,11 @@ defmodule Membrane.File.Source do
}}
end

@impl true
def handle_setup(_ctx, %{location: :stdin} = state) do
{[], state}
end

@impl true
def handle_setup(_ctx, %{location: location} = state) do
fd = @common_file.open!(location, :read)
Expand Down Expand Up @@ -94,6 +115,11 @@ defmodule Membrane.File.Source do
def handle_demand(:output, size, :bytes, _ctx, state),
do: supply_demand(size, [], state)

@impl true
def handle_terminate_request(_ctx, %{location: :stdin} = state) do
{[terminate: :normal], state}
end

@impl true
def handle_terminate_request(_ctx, state) do
@common_file.close!(state.fd)
Expand All @@ -113,6 +139,32 @@ defmodule Membrane.File.Source do
do_supply_demand(min(demand_size, size_to_read), redemand, state)
end

defp do_supply_demand(to_supply_size, redemand, %{location: :stdin} = state) do
{buffer_actions, supplied_size} =
case IO.binread(to_supply_size) do
varsill marked this conversation as resolved.
Show resolved Hide resolved
<<payload::binary>> ->
{[buffer: {:output, %Buffer{payload: payload}}], byte_size(payload)}

:eof ->
{[], 0}
end

actions =
buffer_actions ++
cond do
supplied_size < to_supply_size ->
[end_of_stream: :output]

supplied_size == to_supply_size ->
redemand

true ->
[]
end

{actions, state}
end

defp do_supply_demand(to_supply_size, redemand, state) do
{buffer_actions, supplied_size} =
case @common_file.binread!(state.fd, to_supply_size) do
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ defmodule Membrane.File.Plugin.Mixfile do
defp deps do
[
{:membrane_core, "~> 1.0"},
{:logger_backends, "~> 1.0"},
# Testing
{:mox, "~> 1.0", only: :test},
# Development
Expand Down
1 change: 1 addition & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"logger_backends": {:hex, :logger_backends, "1.0.0", "09c4fad6202e08cb0fbd37f328282f16539aca380f512523ce9472b28edc6bdf", [:mix], [], "hexpm", "1faceb3e7ec3ef66a8f5746c5afd020e63996df6fd4eb8cdb789e5665ae6c9ce"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
Expand Down
18 changes: 18 additions & 0 deletions test/fixtures/file_to_pipe.exs
kidq330 marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import Membrane.ChildrenSpec
import Membrane.Testing.Assertions
alias Membrane.Testing.Pipeline

LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)

[input | _] = System.argv()

spec = [
child(%Membrane.File.Source{location: input})
|> child(:sink, %Membrane.File.Sink{location: :stdout})
]
kidq330 marked this conversation as resolved.
Show resolved Hide resolved

{:ok, _supervisor, pipeline} = Pipeline.start(spec: spec)

assert_end_of_stream(pipeline, :sink, :input)
Pipeline.terminate(pipeline)
19 changes: 19 additions & 0 deletions test/fixtures/pipe_to_file.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import Membrane.ChildrenSpec
import Membrane.Testing.Assertions
alias Membrane.Testing.Pipeline

LoggerBackends.add(LoggerBackends.Console)
LoggerBackends.configure(LoggerBackends.Console, device: :standard_error)

[output, chunk_size_str | _] = System.argv()
{chunk_size, ""} = Integer.parse(chunk_size_str)

spec = [
child(%Membrane.File.Source{location: :stdin, chunk_size: chunk_size})
|> child(:sink, %Membrane.File.Sink{location: output})
]

{:ok, _supervisor, pipeline} = Pipeline.start(spec: spec)

assert_end_of_stream(pipeline, :sink, :input)
Pipeline.terminate(pipeline)
6 changes: 4 additions & 2 deletions test/integration/source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@ defmodule Membrane.File.Integration.SourceTest do
use Membrane.Filter

def_input_pad :input,
accepted_format: _,
accepted_format: _any,
flow_control: :auto

def_output_pad :output, accepted_format: _, flow_control: :auto
def_output_pad :output,
accepted_format: _any,
flow_control: :auto

@impl true
def handle_parent_notification(event, _context, state) do
Expand Down
Loading