Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix last temp file merge, bump core and package version #36

Merged
merged 5 commits into from
Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ workflows:
version: 2
build:
jobs:
- elixir/build_test
- elixir/test
- elixir/lint
- elixir/build_test:
filters: &filters
tags:
only: /v.*/
- elixir/test:
filters:
<<: *filters
- elixir/lint:
filters:
<<: *filters
- elixir/hex_publish:
requires:
- elixir/build_test
- elixir/test
- elixir/lint
context:
- Deployment
filters:
branches:
ignore: /.*/
tags:
only: /v.*/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_file_plugin` to your list of de
```elixir
def deps do
[
{membrane_file_plugin, "~> 0.13.1"}
{:membrane_file_plugin, "~> 0.13.2"}
]
end
```
Expand Down
54 changes: 21 additions & 33 deletions lib/membrane_file/sink.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ defmodule Membrane.File.Sink do
use Membrane.Sink

alias Membrane.File.SeekEvent
alias Membrane.ResourceGuard

@common_file Membrane.File.CommonFileBehaviour.get_impl()

Expand All @@ -34,16 +33,10 @@ defmodule Membrane.File.Sink do
end

@impl true
def handle_setup(ctx, %{location: location} = state) do
def handle_setup(_ctx, %{location: location} = state) do
fd = @common_file.open!(location, [:read, :write])
:ok = @common_file.truncate!(fd)

Membrane.ResourceGuard.register(
ctx.resource_guard,
fn -> @common_file.close!(fd) end,
tag: {:fd, fd}
)

{[], %{state | fd: fd}}
end

Expand All @@ -59,58 +52,53 @@ defmodule Membrane.File.Sink do
end

@impl true
def handle_event(:input, %SeekEvent{insert?: insert?, position: position}, ctx, state) do
def handle_event(:input, %SeekEvent{insert?: insert?, position: position}, _ctx, state) do
state =
if insert?,
do: split_file(state, ctx.resource_guard, position),
else: seek_file(state, ctx.resource_guard, position)
do: split_file(state, position),
else: seek_file(state, position)

{[], state}
end

def handle_event(pad, event, ctx, state), do: super(pad, event, ctx, state)

defp seek_file(%{fd: fd} = state, resource_guard, position) do
state = maybe_merge_temporary(state, resource_guard)
@impl true
def handle_terminate_request(_ctx, state) do
state = maybe_merge_temporary(state)
@common_file.close!(state.fd)

{[terminate: :normal], %{state | fd: nil}}
end

defp seek_file(%{fd: fd} = state, position) do
state = maybe_merge_temporary(state)
_position = @common_file.seek!(fd, position)
state
end

defp split_file(%{fd: fd} = state, resource_guard, position) do
defp split_file(%{fd: fd} = state, position) do
state =
state
|> seek_file(resource_guard, position)
|> open_temporary(resource_guard)
|> seek_file(position)
|> open_temporary()

:ok = @common_file.split!(fd, state.temp_fd)
state
end

defp maybe_merge_temporary(%{temp_fd: nil} = state, _resource_guard), do: state
defp maybe_merge_temporary(%{temp_fd: nil} = state), do: state

defp maybe_merge_temporary(
%{fd: fd, temp_fd: temp_fd, temp_location: temp_location} = state,
resource_guard
) do
defp maybe_merge_temporary(%{fd: fd, temp_fd: temp_fd, temp_location: temp_location} = state) do
# TODO: Consider improving performance for multi-insertion scenarios by using
# multiple temporary files and merging them only once on `handle_prepared_to_stopped/2`.
ResourceGuard.unregister(resource_guard, {:temp_fd, temp_fd})
# multiple temporary files and merging them only once on `handle_terminate_request/2`.
copy_and_remove_temporary(fd, temp_fd, temp_location)
%{state | temp_fd: nil}
end

defp open_temporary(
%{temp_fd: nil, fd: fd, temp_location: temp_location} = state,
resource_guard
) do
defp open_temporary(%{temp_fd: nil, temp_location: temp_location} = state) do
temp_fd = @common_file.open!(temp_location, [:read, :exclusive])

ResourceGuard.register(
resource_guard,
fn -> copy_and_remove_temporary(fd, temp_fd, temp_location) end,
tag: {:temp_fd, temp_fd}
)

%{state | temp_fd: temp_fd}
end

Expand Down
15 changes: 10 additions & 5 deletions lib/membrane_file/sink_multi.ex
Original file line number Diff line number Diff line change
Expand Up @@ -82,16 +82,21 @@ defmodule Membrane.File.Sink.Multi do
{[demand: :input], state}
end

defp open(%{naming_fun: naming_fun, index: index} = state, ctx) do
fd = @common_file.open!(naming_fun.(index), :write)
@impl true
def handle_terminate_request(_ctx, state) do
@common_file.close!(state.fd)

Membrane.ResourceGuard.register(ctx.resource_guard, fn -> @common_file.close!(fd) end, tag: fd)
{[terminate: :normal], %{state | fd: nil}}
end

defp open(%{naming_fun: naming_fun, index: index} = state, _ctx) do
fd = @common_file.open!(naming_fun.(index), :write)

%{state | fd: fd}
end

defp close(%{fd: fd, index: index} = state, ctx) do
Membrane.ResourceGuard.cleanup(ctx.resource_guard, fd)
defp close(%{fd: fd, index: index} = state, _ctx) do
@common_file.close!(fd)

%{state | fd: nil, index: index + 1}
end
Expand Down
14 changes: 8 additions & 6 deletions lib/membrane_file/source.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,9 @@ defmodule Membrane.File.Source do
end

@impl true
def handle_setup(ctx, %{location: location} = state) do
def handle_setup(_ctx, %{location: location} = state) do
fd = @common_file.open!(location, :read)

Membrane.ResourceGuard.register(
ctx.resource_guard,
fn -> @common_file.close!(fd) end
)

{[], %{state | fd: fd}}
end

Expand All @@ -55,6 +50,13 @@ defmodule Membrane.File.Source do
def handle_demand(:output, size, :bytes, _ctx, state),
do: supply_demand(size, [], state)

@impl true
def handle_terminate_request(_ctx, state) do
@common_file.close!(state.fd)

{[terminate: :normal], %{state | fd: nil}}
end

defp supply_demand(size, redemand, %{fd: fd} = state) do
actions =
case @common_file.binread!(fd, size) do
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.File.Plugin.Mixfile do
use Mix.Project

@version "0.13.1"
@version "0.13.2"

@github_url "https://github.com/membraneframework/membrane_file_plugin"

Expand Down
2 changes: 1 addition & 1 deletion mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"},
"makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"},
"makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"},
"membrane_core": {:hex, :membrane_core, "0.11.0", "63ae9f56834ec67680d634d8d69f71b2d46b94f4a0ec8fafcf22d8ce216b8f41", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "097584018fe948fa3013bfd6bcf002b3ad7cbd13f2259be4f1903f37a7aad7ab"},
"membrane_core": {:hex, :membrane_core, "0.11.2", "c8a257bea90c53e0fe99453630a07e4711e4d8ba25e647b3ba346b994aa4f7ab", [:mix], [{:bunch, "~> 1.5", [hex: :bunch, repo: "hexpm", optional: false]}, {:qex, "~> 0.3", [hex: :qex, repo: "hexpm", optional: false]}, {:ratio, "~> 2.0", [hex: :ratio, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "7e2566c9b6d1c22fbb832c22e5f9dbbf7c6cba1c72eeea53bd2f2b73efed58b3"},
"mox": {:hex, :mox, "1.0.1", "b651bf0113265cda0ba3a827fcb691f848b683c373b77e7d7439910a8d754d6e", [:mix], [], "hexpm", "35bc0dea5499d18db4ef7fe4360067a59b06c74376eb6ab3bd67e6295b133469"},
"nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"},
"numbers": {:hex, :numbers, "5.2.4", "f123d5bb7f6acc366f8f445e10a32bd403c8469bdbce8ce049e1f0972b607080", [:mix], [{:coerce, "~> 1.0", [hex: :coerce, repo: "hexpm", optional: false]}, {:decimal, "~> 1.9 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "eeccf5c61d5f4922198395bf87a465b6f980b8b862dd22d28198c5e6fab38582"},
Expand Down
14 changes: 3 additions & 11 deletions test/membrane_file/sink_multi_test.exs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
defmodule Membrane.File.Sink.MultiTest do
use Membrane.File.TestCaseTemplate, module: Membrane.File.Sink.Multi, async: true

import Membrane.Testing.Assertions

alias Membrane.File.{CommonMock, SplitEvent}
alias Membrane.Buffer
alias Membrane.File.{CommonMock, SplitEvent}

@module Membrane.File.Sink.Multi

defp state_and_ctx(_ctx) do
{:ok, resource_guard} = Membrane.Testing.MockResourceGuard.start_link()

%{
ctx: %{resource_guard: resource_guard},
ctx: nil,
state: %{
location: "",
fd: nil,
Expand All @@ -25,8 +21,6 @@ defmodule Membrane.File.Sink.MultiTest do

setup :state_and_ctx

setup :verify_on_exit!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why have you deleted this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use Membrane.File.TestCaseTemplate already sets this up


describe "handle_write" do
setup :inject_mock_fd

Expand Down Expand Up @@ -55,13 +49,11 @@ defmodule Membrane.File.Sink.MultiTest do
%{fd: file} = state

CommonMock
|> expect(:close!, fn ^file -> :ok end)
|> expect(:open!, fn "1", _modes -> :new_file end)

assert {[], %{state | index: 1, fd: :new_file}} ==
@module.handle_event(:input, %SplitEvent{}, ctx, state)

assert_resource_guard_cleanup(ctx.resource_guard, ^file)
assert_resource_guard_register(ctx.resource_guard, _fun, :new_file)
end

test "should not close current file and open new one if event type is not state.split_on", %{
Expand Down
35 changes: 34 additions & 1 deletion test/membrane_file/sink_source_integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
import Membrane.Testing.Assertions
import Mox, only: [set_mox_global: 1]

alias Membrane.Buffer
alias Membrane.File, as: MbrFile
alias Membrane.Testing.Pipeline
alias Membrane.Testing.{Source, Pipeline}

@moduletag :tmp_dir

Expand Down Expand Up @@ -36,6 +37,38 @@ defmodule Membrane.File.SinkSourceIntegrationTest do
assert File.read!(ctx.output_path) == ctx.content
end

test "Sink temporary file merge when pipeline terminates", ctx do
expected_content = """
Roses are red,
Violets are blue,
If you're reading this,
I'm sorry for you.
"""

{first_part, second_part} = String.split_at(expected_content, 32)

actions = [
{:buffer, {:output, %Buffer{payload: second_part}}},
{:event, {:output, %MbrFile.SeekEvent{position: :bof, insert?: true}}},
{:buffer, {:output, %Buffer{payload: first_part}}},
{:end_of_stream, :output}
]

generator = fn state, _size -> {actions, state} end

structure = [
child(:testing_source, %Source{output: {nil, generator}})
|> child(:file_sink, %MbrFile.Sink{location: ctx.output_path})
]

assert pid = Pipeline.start_link_supervised!(structure: structure)
assert_start_of_stream(pid, :file_sink, :input)
assert_end_of_stream(pid, :file_sink, :input, 5_000)
Pipeline.terminate(pid, blocking?: true)

assert File.read!(ctx.output_path) == expected_content
end

defmodule EmptyFilter do
use Membrane.Filter

Expand Down
Loading