Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release membrane_file_plugin v0.16.0 #47

Merged
merged 4 commits into from
Nov 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_file_plugin` to your list of de
```elixir
def deps do
[
{:membrane_file_plugin, "~> 0.15.0"}
{:membrane_file_plugin, "~> 0.16.0"}
]
end
```
Expand Down
4 changes: 2 additions & 2 deletions examples/sink_and_source.exs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@ defmodule FileExamplePipeline do
@doc false
@impl true
def handle_init(_ctx, target) do
structure = [
spec = [
child(:file_src, %Membrane.File.Source{location: __ENV__.file})
|> child(:file_sink, %Membrane.File.Sink{location: "/tmp/test"})
]

{[spec: structure, playback: :playing], %{target: target}}
{[spec: spec, playback: :playing], %{target: target}}
end

@impl true
Expand Down
12 changes: 6 additions & 6 deletions examples/sink_multi.exs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ defmodule Splitter do
alias Membrane.Buffer
alias Membrane.File.SplitEvent

def_input_pad :input, demand_unit: :bytes, demand_mode: :auto, accepted_format: Membrane.RemoteStream
def_output_pad :output, demand_mode: :auto, accepted_format: Membrane.RemoteStream
def_input_pad :input, flow_control: :auto, accepted_format: Membrane.RemoteStream
def_output_pad :output, flow_control: :auto, accepted_format: Membrane.RemoteStream

def_options head_size: [type: :integer]

Expand All @@ -20,7 +20,7 @@ defmodule Splitter do
end

@impl true
def handle_process(:input, buffer, _ctx, %{head_size: head_size, split?: true}) do
def handle_buffer(:input, buffer, _ctx, %{head_size: head_size, split?: true}) do
<<head::binary-size(head_size), tail::binary>> = buffer.payload

actions = [
Expand All @@ -32,7 +32,7 @@ defmodule Splitter do
{ actions, %{split?: false}}
end

def handle_process(:input, buffer, _ctx, %{split?: false}) do
def handle_buffer(:input, buffer, _ctx, %{split?: false}) do
{[buffer: {:output, buffer}], %{split?: false}}
end
end
Expand All @@ -45,13 +45,13 @@ defmodule SinkMultiExamplePipeline do
@doc false
@impl true
def handle_init(_ctx, target) do
structure = [
spec = [
child(:file_source, %Membrane.File.Source{location: "input.bin"})
|> child(:filter, %Splitter{head_size: 10})
|> child(:file_sink, %Membrane.File.Sink.Multi{location: "/tmp/output", extension: ".bin"})
]

{[spec: structure, playback: :playing], %{target: target}}
{[spec: spec, playback: :playing], %{target: target}}
end

@impl true
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Membrane.File.Sink do
description: "Path of the output file"
]

def_input_pad :input, demand_unit: :buffers, accepted_format: _any
def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any

@impl true
def handle_init(_ctx, %__MODULE__{location: location}) do
Expand All @@ -46,7 +46,7 @@ defmodule Membrane.File.Sink do
end

@impl true
def handle_write(:input, buffer, _ctx, %{fd: fd} = state) do
def handle_buffer(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{[demand: :input], state}
end
Expand Down
4 changes: 2 additions & 2 deletions lib/membrane_file/sink_multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ defmodule Membrane.File.Sink.Multi do
@spec default_naming_fun(Path.t(), non_neg_integer(), String.t()) :: Path.t()
def default_naming_fun(path, i, ext), do: [path, i, ext] |> Enum.join() |> Path.expand()

def_input_pad :input, demand_unit: :buffers, accepted_format: _any
def_input_pad :input, flow_control: :manual, demand_unit: :buffers, accepted_format: _any

@impl true
def handle_init(_ctx, %__MODULE__{} = options) do
Expand Down Expand Up @@ -77,7 +77,7 @@ defmodule Membrane.File.Sink.Multi do
def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

@impl true
def handle_write(:input, buffer, _ctx, %{fd: fd} = state) do
def handle_buffer(:input, buffer, _ctx, %{fd: fd} = state) do
:ok = @common_file.write!(fd, buffer)
{[demand: :input], state}
end
Expand Down
2 changes: 1 addition & 1 deletion lib/membrane_file/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ defmodule Membrane.File.Source do
"""
]

def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}
def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, flow_control: :manual

@impl true
def handle_init(_ctx, %__MODULE__{location: location, chunk_size: size, seekable?: seekable?}) do
Expand Down
4 changes: 2 additions & 2 deletions mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.File.Plugin.Mixfile do
use Mix.Project

@version "0.15.0"
@version "0.16.0"

@github_url "https://github.com/membraneframework/membrane_file_plugin"

Expand Down Expand Up @@ -38,7 +38,7 @@ defmodule Membrane.File.Plugin.Mixfile do

defp deps do
[
{:membrane_core, "~> 0.12.9"},
{:membrane_core, "~> 1.0"},
# Testing
{:mox, "~> 1.0", only: :test},
# Development
Expand Down
18 changes: 9 additions & 9 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,21 @@
"bunch": {:hex, :bunch, "1.6.0", "4775f8cdf5e801c06beed3913b0bd53fceec9d63380cdcccbda6be125a6cfd54", [:mix], [], "hexpm", "ef4e9abf83f0299d599daed3764d19e8eac5d27a5237e5e4d5e2c129cfeb9a22"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.7.0", "6119bee47272e85995598ee04f2ebbed3e947678dee048d10b5feca139435f75", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "6839fcf63d1f0d1c0f450abc8564a57c43d644077ab96f2934563e68b8a769d7"},
"dialyxir": {:hex, :dialyxir, "1.3.0", "fd1672f0922b7648ff9ce7b1b26fcf0ef56dda964a459892ad15f6b4410b5284", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "00b2a4bcd6aa8db9dcb0b38c1225b7277dca9bc370b6438715667071a304696f"},
"earmark_parser": {:hex, :earmark_parser, "1.4.32", "fa739a0ecfa34493de19426681b23f6814573faee95dfd4b4aafe15a7b5b32c6", [:mix], [], "hexpm", "b8b0dd77d60373e77a3d7e8afa598f325e49e8663a51bcc2b88ef41838cca755"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.37", "2ad73550e27c8946648b06905a57e4d454e4d7229c2dafa72a0348c99d8be5f7", [:mix], [], "hexpm", "6b19783f2802f039806f375610faa22da130b8edc21209d0bff47918bb48360e"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.29.4", "6257ecbb20c7396b1fe5accd55b7b0d23f44b6aa18017b415cb4c2b91d997729", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "2c6699a737ae46cb61e4ed012af931b57b699643b24dabe2400a8168414bc4f5"},
"ex_doc": {:hex, :ex_doc, "0.30.9", "d691453495c47434c0f2052b08dd91cc32bc4e1a218f86884563448ee2502dd2", [:mix], [{:earmark_parser, "~> 1.4.31", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d7aaaf21e95dc5cddabf89063327e96867d00013963eadf2c6ad135506a8bc10"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"jason": {:hex, :jason, "1.4.0", "e855647bc964a44e2f67df589ccf49105ae039d4179db7f6271dfd3843dc27e6", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "79a3791085b2a0f743ca04cec0f7be26443738779d09302e01318f97bdb82121"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.1", "cc9e3ca312f1cfeccc572b37a09980287e243648108384b97ff2b76e505c3555", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "e127a341ad1b209bd80f7bd1620a15693a9908ed780c3b763bccf7d200c767c6"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"membrane_core": {:hex, :membrane_core, "0.12.9", "b80239deacf98f24cfd2e0703b632e92ddded8b989227cd6e724140f433b0aac", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "389b4b22da0e35d5b053ec2fa87bf36882e0ab88f8fb841af895982fb4abe504"},
"mox": {:hex, :mox, "1.0.2", "dc2057289ac478b35760ba74165b4b3f402f68803dd5aecd3bfd19c183815d64", [:mix], [], "hexpm", "f9864921b3aaf763c8741b5b8e6f908f44566f1e427b2630e89e9a73b981fef2"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.2", "ad87296a092a46e03b7e9b0be7631ddcf64c790fa68a9ef5323b6cbb36affc72", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "f3f5a1ca93ce6e092d92b6d9c049bcda58a3b617a8d888f8e7231c85630e8108"},
"membrane_core": {:hex, :membrane_core, "1.0.0", "1b543aefd952283be1f2a215a1db213aa4d91222722ba03cd35280622f1905ee", [:mix], [{:bunch, "~> 1.6", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 3.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "352c90fd0a29942143c4bf7a727cc05c632e323f50a1a4e99321b1e8982f1533"},
"mox": {:hex, :mox, "1.1.0", "0f5e399649ce9ab7602f72e718305c0f9cdc351190f72844599545e4996af73c", [:mix], [], "hexpm", "d44474c50be02d5b72131070281a5d3895c0e7a95c780e90bc0cfe712f633a13"},
"nimble_parsec": {:hex, :nimble_parsec, "1.3.1", "2c54013ecf170e249e9291ed0a62e5832f70a476c61da16f6aac6dca0189f2af", [:mix], [], "hexpm", "2682e3c0b2eb58d90c6375fc0cc30bc7be06f365bf72608804fb9cffa5e1b167"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
"qex": {:hex, :qex, "0.5.1", "0d82c0f008551d24fffb99d97f8299afcb8ea9cf99582b770bd004ed5af63fd6", [:mix], [], "hexpm", "935a39fdaf2445834b95951456559e9dc2063d0a055742c558a99987b38d6bab"},
"ratio": {:hex, :ratio, "2.4.2", "c8518f3536d49b1b00d88dd20d49f8b11abb7819638093314a6348139f14f9f9", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "441ef6f73172a3503de65ccf1769030997b0d533b1039422f1e5e0e0b4cbf89e"},
"ratio": {:hex, :ratio, "3.0.2", "60a5976872a4dc3d873ecc57eed1738589e99d1094834b9c935b118231297cfb", [:mix], [{:decimal, "~> 1.6 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}, {:numbers, "~> 5.2.0", [hex: :numbers, repo: "hexpm", optional: false]}], "hexpm", "3a13ed5a30ad0bfd7e4a86bf86d93d2b5a06f5904417d38d3f3ea6406cdfc7bb"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
}
14 changes: 5 additions & 9 deletions test/integration/source_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@ defmodule Membrane.File.Integration.SourceTest do

def_input_pad :input,
accepted_format: _,
mode: :pull,
demand_mode: :auto,
demand_unit: :bytes
flow_control: :auto

def_output_pad :output, accepted_format: _, mode: :pull, demand_mode: :auto
def_output_pad :output, accepted_format: _, flow_control: :auto

@impl true
def handle_parent_notification(event, _context, state) do
{[event: {:input, event}], state}
end

@impl true
def handle_process(:input, buffer, _context, state) do
def handle_buffer(:input, buffer, _context, state) do
{[buffer: {:output, buffer}], state}
end
end
Expand All @@ -38,14 +36,13 @@ defmodule Membrane.File.Integration.SourceTest do
spec = [
child(:source, %Source{
location: @input_text_file,
chunk_size: 2,
seekable?: true
})
|> child(:filter, Filter)
|> child(:sink, Sink)
]

{:ok, _supervisor_pid, pipeline_pid} = Pipeline.start(structure: spec)
{:ok, _supervisor_pid, pipeline_pid} = Pipeline.start(spec: spec)
refute_sink_buffer(pipeline_pid, :sink, _)

Pipeline.execute_actions(pipeline_pid,
Expand Down Expand Up @@ -78,14 +75,13 @@ defmodule Membrane.File.Integration.SourceTest do
spec = [
child(:source, %Source{
location: @input_text_file,
chunk_size: 2,
seekable?: true
})
|> child(:filter, Filter)
|> child(:sink, Sink)
]

{:ok, _supervisor_pid, pipeline_pid} = Pipeline.start(structure: spec)
{:ok, _supervisor_pid, pipeline_pid} = Pipeline.start(spec: spec)
refute_sink_buffer(pipeline_pid, :sink, _)

Pipeline.execute_actions(pipeline_pid,
Expand Down
4 changes: 2 additions & 2 deletions test/membrane_file/sink_multi_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defmodule Membrane.File.Sink.MultiTest do

setup :state_and_ctx

describe "handle_write" do
describe "handle_buffer" do
setup :inject_mock_fd

test "should write received chunk and request demand", %{state: state} do
Expand All @@ -31,7 +31,7 @@ defmodule Membrane.File.Sink.MultiTest do
CommonMock |> expect(:write!, fn ^file, ^buffer -> :ok end)

assert {[demand: :input], state} ==
@module.handle_write(:input, buffer, nil, state)
@module.handle_buffer(:input, buffer, nil, state)
end
end

Expand Down
40 changes: 19 additions & 21 deletions test/membrane_file/sink_source_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
end

test "File copy", ctx do
structure = [
spec = [
child(:file_source, %MbrFile.Source{location: ctx.input_path})
|> child(:file_sink, %MbrFile.Sink{location: ctx.output_path})
]

assert {:ok, _supervisor_pid, pid} = Pipeline.start_link(structure: structure)
assert {:ok, _supervisor_pid, pid} = Pipeline.start_link(spec: spec)
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)
Pipeline.terminate(pid)

assert File.read!(ctx.output_path) == ctx.content
end
Expand All @@ -56,15 +56,15 @@ defmodule Membrane.File.SinkSourceIntegrationTest do

generator = fn state, _size -> {actions, state} end

structure = [
spec = [
child(:testing_source, %Source{output: {nil, generator}})
|> child(:file_sink, %MbrFile.Sink{location: ctx.output_path})
]

assert pid = Pipeline.start_link_supervised!(structure: structure)
assert pid = Pipeline.start_link_supervised!(spec: spec)
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)
Pipeline.terminate(pid)

assert File.read!(ctx.output_path) == expected_content
end
Expand All @@ -73,29 +73,28 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
use Membrane.Filter

def_input_pad :input,
demand_unit: :bytes,
demand_mode: :auto,
flow_control: :auto,
accepted_format: Membrane.RemoteStream

def_output_pad :output, demand_mode: :auto, accepted_format: Membrane.RemoteStream
def_output_pad :output, flow_control: :auto, accepted_format: Membrane.RemoteStream

@impl true
def handle_process(:input, buffer, _ctx, state) do
def handle_buffer(:input, buffer, _ctx, state) do
{[buffer: {:output, buffer}], state}
end
end

test "File copy with filter", ctx do
structure = [
spec = [
child(:file_source, %MbrFile.Source{location: ctx.input_path})
|> child(:filter, EmptyFilter)
|> child(:file_sink, %MbrFile.Sink{location: ctx.output_path})
]

assert {:ok, _supervisor_pid, pid} = Pipeline.start_link(structure: structure)
assert {:ok, _supervisor_pid, pid} = Pipeline.start_link(spec: spec)
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)
Pipeline.terminate(pid)

assert File.read!(ctx.output_path) == ctx.content
end
Expand All @@ -107,11 +106,10 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
alias Membrane.File.SplitEvent

def_input_pad :input,
demand_unit: :bytes,
demand_mode: :auto,
flow_control: :auto,
accepted_format: Membrane.RemoteStream

def_output_pad :output, demand_mode: :auto, accepted_format: Membrane.RemoteStream
def_output_pad :output, flow_control: :auto, accepted_format: Membrane.RemoteStream

def_options head_size: [type: :integer]

Expand All @@ -121,7 +119,7 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
end

@impl true
def handle_process(:input, buffer, _ctx, %{head_size: head_size, split?: true}) do
def handle_buffer(:input, buffer, _ctx, %{head_size: head_size, split?: true}) do
<<head::binary-size(head_size), tail::binary>> = buffer.payload

actions = [
Expand All @@ -133,24 +131,24 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
{actions, %{split?: false}}
end

def handle_process(:input, buffer, _ctx, %{split?: false}) do
def handle_buffer(:input, buffer, _ctx, %{split?: false}) do
{[buffer: {:output, buffer}], %{split?: false}}
end
end

test "MultiSink with splitter", ctx do
head_size = 10

structure = [
spec = [
child(:file_source, %MbrFile.Source{location: ctx.input_path})
|> child(:filter, %Splitter{head_size: head_size})
|> child(:file_sink, %MbrFile.Sink.Multi{location: ctx.output_path, extension: ".bin"})
]

assert {:ok, _supervisor_pid, pid} = Pipeline.start_link(structure: structure)
assert {:ok, _supervisor_pid, pid} = Pipeline.start_link(spec: spec)
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)
Pipeline.terminate(pid)

assert File.read!(ctx.output_path <> "0.bin") == binary_part(ctx.content, 0, head_size)

Expand Down
Loading