Skip to content

Commit

Permalink
chore: handle file deleted event in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed Jun 21, 2024
1 parent 213f207 commit daa29e7
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 44 deletions.
27 changes: 15 additions & 12 deletions lib/elasticlunr/index/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,15 @@ defmodule Elasticlunr.Index.Reader do

require Logger

defstruct [:dir, :schema, :segments]
@enforce_keys [:dir, :schema]
defstruct [:dir, :schema, segments: []]

@type t :: %__MODULE__{
dir: Path.t(),
schema: Schema.t(),
segments: [SSTable.t()]
}

@spec new(Path.t(), Schema.t(), keyword()) :: t()
def new(dir, schema, opts \\ []) do
attrs = [
dir: dir,
schema: schema,
segments: Keyword.get(opts, :segments, [])
]

struct!(__MODULE__, attrs)
end

@spec loaded?(t(), FileMeta.t()) :: boolean()
def loaded?(%__MODULE__{segments: segments}, %FileMeta{dir: dir, number: number}) do
dir
Expand All @@ -45,6 +35,19 @@ defmodule Elasticlunr.Index.Reader do
end
end

@spec remove_segment(t(), pos_integer()) :: t()
def remove_segment(%__MODULE__{segments: segments} = reader, number) when is_integer(number) do
segments =
Enum.reject(segments, fn %{path: path} ->
case Filename.parse(path) do
{:sst, ^number} -> true
_ -> false
end
end)

%{reader | segments: segments}
end

@spec get!(t(), String.t()) :: map() | nil | no_return()
def get!(%__MODULE__{schema: schema, segments: segments}, id) do
id = Utils.id_from_string(id)
Expand Down
5 changes: 0 additions & 5 deletions lib/elasticlunr/manifest/changes.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ defmodule Elasticlunr.Manifest.Changes do
Enum.reduce(files, changes, &add_file(&2, level, &1))
end

@spec new_files(t()) :: [FileMeta.t()]
def new_files(%__MODULE__{new_files: new_files}) do
Enum.map(new_files, &elem(&1, 1))
end

@spec delete_files(t(), [pos_integer() | FileMeta.t()]) :: t()
def delete_files(%__MODULE__{delete_files: delete_files} = changes, files) do
fun = fn
Expand Down
8 changes: 7 additions & 1 deletion lib/elasticlunr/server/reader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ defmodule Elasticlunr.Server.Reader do
end
end

def handle_info({:file_deleted, number}, %__MODULE__{reader: reader} = state) do
reader = Reader.remove_segment(reader, number)

{:noreply, %{state | reader: reader}}
end

def handle_info(_msg, state), do: {:noreply, state}

defp read_manifest(%{dir: dir} = state) do
Expand Down Expand Up @@ -109,7 +115,7 @@ defmodule Elasticlunr.Server.Reader do

defp patch_reader(%{dir: dir, manifest: manifest, schema: schema, ss_tables: segments}) do
with :ok <- Manifest.close(manifest) do
{:ok, Reader.new(dir, schema, segments: segments)}
{:ok, %Reader{dir: dir, schema: schema, segments: segments}}
end
end
end
44 changes: 21 additions & 23 deletions lib/elasticlunr/server/writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -87,20 +87,13 @@ defmodule Elasticlunr.Server.Writer do
) do
manifest = Writer.manifest(writer)

case Manifest.apply_and_log(manifest, changes) do
{:ok, manifest} ->
%Writer{schema: schema} = writer
writer = Writer.remove_obsolete_files(%{writer | manifest: manifest})

new_files = Changes.new_files(changes)

# TODO: publish removed files to reader so that it can be up to date
publish_new_files!(schema.name, new_files)

{:reply, :ok, %{state | writer: writer}}

{:error, reason} ->
{:stop, reason, state}
with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes),
writer = %{writer | manifest: manifest},
writer = Writer.remove_obsolete_files(writer),
:ok <- publish_files_changes!(writer, changes) do
{:reply, :ok, %{state | writer: writer}}
else
{:error, reason} -> {:stop, reason, state}
end
end

Expand All @@ -124,10 +117,7 @@ defmodule Elasticlunr.Server.Writer do
file_metas = Enum.filter(file_metas, &(&1.size > 0))

with file_metas when file_metas != [] <- file_metas,
{:ok, writer} <- add_files_to_manifest(file_metas, writer),
%Writer{schema: schema} <- writer do
publish_new_files!(schema.name, file_metas)

{:ok, writer} <- add_files_to_manifest(file_metas, writer) do
state
|> Map.put(:writer, writer)
# Schedule another compaction in case the generated file fills a level
Expand Down Expand Up @@ -238,8 +228,10 @@ defmodule Elasticlunr.Server.Writer do
|> Changes.add_files(0, file_metas)
|> Changes.set_log_number(manifest.log_number)

with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes) do
{:ok, %{writer | manifest: manifest}}
with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes),
writer = %{writer | manifest: manifest},
:ok <- publish_files_changes!(writer, changes) do
{:ok, writer}
end
end

Expand Down Expand Up @@ -268,9 +260,15 @@ defmodule Elasticlunr.Server.Writer do
end)
end

defp publish_new_files!(index, files) do
files
defp publish_files_changes!(
%{schema: schema},
%{new_files: new_files, delete_files: delete_files}
) do
Enum.each(delete_files, &(:ok = PubSub.publish(schema.name, :file_deleted, &1)))

new_files
|> Enum.map(&elem(&1, 1))
|> Enum.sort_by(& &1.number)
|> Enum.each(&(:ok = PubSub.publish(index, :file_created, &1)))
|> Enum.each(&(:ok = PubSub.publish(schema.name, :file_created, &1)))
end
end
9 changes: 6 additions & 3 deletions test/server/reader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Elasticlunr.Server.ReaderTest do
use ExUnit.Case, async: true

alias Elasticlunr.Book
alias Elasticlunr.PubSub
alias Elasticlunr.Server.Reader
alias Elasticlunr.Server.Writer

Expand Down Expand Up @@ -74,20 +75,22 @@ defmodule Elasticlunr.Server.ReaderTest do
assert entry.id == document.id
end

@tag skip: "testing should be all about compacting sstables"
test "update internals when a segment is deleted", %{
dir: dir,
pid: pid,
document: document,
schema: schema,
writer: writer
} do
GenServer.call(writer, {:save, new_book()})

# Add an extra write to force generate sstable
GenServer.call(writer, {:save, new_book()})

ss_tables = ss_tables(dir)

assert eventually(fn -> GenServer.call(pid, {:get, document.id}) end)
assert Enum.each(ss_tables, &File.rm_rf!/1)
assert_received {:remove_lockfile, _dir, _path}
Enum.each(ss_tables, &PubSub.publish(schema.name, :file_deleted, &1))
assert eventually(fn -> GenServer.call(pid, {:get, document.id}) == nil end)
end
end

0 comments on commit daa29e7

Please sign in to comment.