From daa29e72983ab59d9da0d77de8486d8ff1b38259 Mon Sep 17 00:00:00 2001 From: atanda rasheed Date: Fri, 21 Jun 2024 23:57:49 +0100 Subject: [PATCH] chore: handle file deleted event in reader --- lib/elasticlunr/index/reader.ex | 27 ++++++++++-------- lib/elasticlunr/manifest/changes.ex | 5 ---- lib/elasticlunr/server/reader.ex | 8 +++++- lib/elasticlunr/server/writer.ex | 44 ++++++++++++++--------------- test/server/reader_test.exs | 9 ++++-- 5 files changed, 49 insertions(+), 44 deletions(-) diff --git a/lib/elasticlunr/index/reader.ex b/lib/elasticlunr/index/reader.ex index c79cbf9..dc526cd 100644 --- a/lib/elasticlunr/index/reader.ex +++ b/lib/elasticlunr/index/reader.ex @@ -8,7 +8,8 @@ defmodule Elasticlunr.Index.Reader do require Logger - defstruct [:dir, :schema, :segments] + @enforce_keys [:dir, :schema] + defstruct [:dir, :schema, segments: []] @type t :: %__MODULE__{ dir: Path.t(), @@ -16,17 +17,6 @@ defmodule Elasticlunr.Index.Reader do segments: [SSTable.t()] } - @spec new(Path.t(), Schema.t(), keyword()) :: t() - def new(dir, schema, opts \\ []) do - attrs = [ - dir: dir, - schema: schema, - segments: Keyword.get(opts, :segments, []) - ] - - struct!(__MODULE__, attrs) - end - @spec loaded?(t(), FileMeta.t()) :: boolean() def loaded?(%__MODULE__{segments: segments}, %FileMeta{dir: dir, number: number}) do dir @@ -45,6 +35,19 @@ defmodule Elasticlunr.Index.Reader do end end + @spec remove_segment(t(), pos_integer()) :: t() + def remove_segment(%__MODULE__{segments: segments} = reader, number) when is_integer(number) do + segments = + Enum.reject(segments, fn %{path: path} -> + case Filename.parse(path) do + {:sst, ^number} -> true + _ -> false + end + end) + + %{reader | segments: segments} + end + @spec get!(t(), String.t()) :: map() | nil | no_return() def get!(%__MODULE__{schema: schema, segments: segments}, id) do id = Utils.id_from_string(id) diff --git a/lib/elasticlunr/manifest/changes.ex b/lib/elasticlunr/manifest/changes.ex index 5be6850..5bec511 100644 --- a/lib/elasticlunr/manifest/changes.ex +++ b/lib/elasticlunr/manifest/changes.ex @@ -34,11 +34,6 @@ defmodule Elasticlunr.Manifest.Changes do Enum.reduce(files, changes, &add_file(&2, level, &1)) end - @spec new_files(t()) :: [FileMeta.t()] - def new_files(%__MODULE__{new_files: new_files}) do - Enum.map(new_files, &elem(&1, 1)) - end - @spec delete_files(t(), [pos_integer() | FileMeta.t()]) :: t() def delete_files(%__MODULE__{delete_files: delete_files} = changes, files) do fun = fn diff --git a/lib/elasticlunr/server/reader.ex b/lib/elasticlunr/server/reader.ex index f28fe99..55a7fa8 100644 --- a/lib/elasticlunr/server/reader.ex +++ b/lib/elasticlunr/server/reader.ex @@ -67,6 +67,12 @@ defmodule Elasticlunr.Server.Reader do end end + def handle_info({:file_deleted, number}, %__MODULE__{reader: reader} = state) do + reader = Reader.remove_segment(reader, number) + + {:noreply, %{state | reader: reader}} + end + def handle_info(_msg, state), do: {:noreply, state} defp read_manifest(%{dir: dir} = state) do @@ -109,7 +115,7 @@ defmodule Elasticlunr.Server.Reader do defp patch_reader(%{dir: dir, manifest: manifest, schema: schema, ss_tables: segments}) do with :ok <- Manifest.close(manifest) do - {:ok, Reader.new(dir, schema, segments: segments)} + {:ok, %Reader{dir: dir, schema: schema, segments: segments}} end end end diff --git a/lib/elasticlunr/server/writer.ex b/lib/elasticlunr/server/writer.ex index 5f0e6a1..451a3c5 100644 --- a/lib/elasticlunr/server/writer.ex +++ b/lib/elasticlunr/server/writer.ex @@ -87,20 +87,13 @@ defmodule Elasticlunr.Server.Writer do ) do manifest = Writer.manifest(writer) - case Manifest.apply_and_log(manifest, changes) do - {:ok, manifest} -> - %Writer{schema: schema} = writer - writer = Writer.remove_obsolete_files(%{writer | manifest: manifest}) - - new_files = Changes.new_files(changes) - - # TODO: publish removed files to reader so that it can be up to date - publish_new_files!(schema.name, new_files) - - {:reply, :ok, %{state | writer: writer}} - - {:error, reason} -> - {:stop, reason, state} + with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes), + writer = %{writer | manifest: manifest}, + writer = Writer.remove_obsolete_files(writer), + :ok <- publish_files_changes!(writer, changes) do + {:reply, :ok, %{state | writer: writer}} + else + {:error, reason} -> {:stop, reason, state} end end @@ -124,10 +117,7 @@ defmodule Elasticlunr.Server.Writer do file_metas = Enum.filter(file_metas, &(&1.size > 0)) with file_metas when file_metas != [] <- file_metas, - {:ok, writer} <- add_files_to_manifest(file_metas, writer), - %Writer{schema: schema} <- writer do - publish_new_files!(schema.name, file_metas) - + {:ok, writer} <- add_files_to_manifest(file_metas, writer) do state |> Map.put(:writer, writer) # Schedule another compaction in case the generated file fills a level @@ -238,8 +228,10 @@ defmodule Elasticlunr.Server.Writer do |> Changes.add_files(0, file_metas) |> Changes.set_log_number(manifest.log_number) - with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes) do - {:ok, %{writer | manifest: manifest}} + with {:ok, manifest} <- Manifest.apply_and_log(manifest, changes), + writer = %{writer | manifest: manifest}, + :ok <- publish_files_changes!(writer, changes) do + {:ok, writer} end end @@ -268,9 +260,15 @@ defmodule Elasticlunr.Server.Writer do end) end - defp publish_new_files!(index, files) do - files + defp publish_files_changes!( + %{schema: schema}, + %{new_files: new_files, delete_files: delete_files} + ) do + Enum.each(delete_files, &(:ok = PubSub.publish(schema.name, :file_deleted, &1))) + + new_files + |> Enum.map(&elem(&1, 1)) |> Enum.sort_by(& &1.number) - |> Enum.each(&(:ok = PubSub.publish(index, :file_created, &1))) + |> Enum.each(&(:ok = PubSub.publish(schema.name, :file_created, &1))) end end diff --git a/test/server/reader_test.exs b/test/server/reader_test.exs index 485a118..499b641 100644 --- a/test/server/reader_test.exs +++ b/test/server/reader_test.exs @@ -2,6 +2,7 @@ defmodule Elasticlunr.Server.ReaderTest do use ExUnit.Case, async: true alias Elasticlunr.Book + alias Elasticlunr.PubSub alias Elasticlunr.Server.Reader alias Elasticlunr.Server.Writer @@ -74,20 +75,22 @@ defmodule Elasticlunr.Server.ReaderTest do assert entry.id == document.id end - @tag skip: "testing should be all about compacting sstables" test "update internals when a segment is deleted", %{ dir: dir, pid: pid, document: document, + schema: schema, writer: writer } do GenServer.call(writer, {:save, new_book()}) + # Add an extra write to force generate sstable + GenServer.call(writer, {:save, new_book()}) + ss_tables = ss_tables(dir) assert eventually(fn -> GenServer.call(pid, {:get, document.id}) end) - assert Enum.each(ss_tables, &File.rm_rf!/1) - assert_received {:remove_lockfile, _dir, _path} + Enum.each(ss_tables, &PubSub.publish(schema.name, :file_deleted, &1)) assert eventually(fn -> GenServer.call(pid, {:get, document.id}) == nil end) end end