Skip to content

Commit

Permalink
chore: improve options strategy and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
heywhy committed May 18, 2024
1 parent 8d8a786 commit 94321f4
Show file tree
Hide file tree
Showing 16 changed files with 287 additions and 158 deletions.
4 changes: 2 additions & 2 deletions .tool-versions
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
elixir 1.15.7-otp-25
erlang 25.3
elixir 1.16.2-otp-26
erlang 26.2.5
51 changes: 24 additions & 27 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -1,29 +1,26 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"type": "mix_task",
"name": "mix (Default task)",
"request": "launch",
"projectDir": "${workspaceRoot}"
},
{
"type": "mix_task",
"name": "mix test",
"request": "launch",
"task": "test",
"taskArgs": [
"--trace"
],
"startApps": true,
"projectDir": "${workspaceFolder}",
"requireFiles": [
"test/**/test_helper.exs",
"test/**/*_test.exs"
]
}
]
"version": "0.2.0",
"configurations": [
{
"type": "mix_task",
"name": "mix (Default task)",
"request": "launch",
"projectDir": "${workspaceRoot}"
},
{
"type": "mix_task",
"name": "mix test",
"request": "launch",
"task": "test",
"taskArgs": [
"--trace"
],
"startApps": true,
"projectDir": "${workspaceFolder}",
"requireFiles": [
"test/**/test_helper.exs",
"test/**/*_test.exs"
]
}
]
}
15 changes: 9 additions & 6 deletions lib/elasticlunr/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ defmodule Elasticlunr.Index do

@before_compile Elasticlunr.Index

Module.register_attribute(__MODULE__, :compaction_strategy, [])
Module.register_attribute(__MODULE__, :options, [])

@spec child_spec(keyword()) :: Supervisor.child_spec()
def child_spec(arg) do
%{
Expand Down Expand Up @@ -51,12 +54,12 @@ defmodule Elasticlunr.Index do
def delete(id), do: Index.Supervisor.delete(@name, id)

@spec __schema__() :: Schema.t()
if @compaction_strategy do
def __schema__ do
struct!(@schema, compaction_strategy: @compaction_strategy)
end
else
def __schema__, do: @schema
def __schema__ do
opts = [compaction_strategy: @compaction_strategy, options: @options]

opts
|> Enum.reject(&(elem(&1, 1) |> is_nil()))
|> then(&struct!(@schema, &1))
end

@spec running?() :: boolean()
Expand Down
6 changes: 1 addition & 5 deletions lib/elasticlunr/index/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ defmodule Elasticlunr.Index.Supervisor do

require Logger

@otp_app :elasticlunr
# default to 160mb
@mem_table_max_size 167_772_160
@registry Elasticlunr.IndexRegistry

@spec save(binary(), map()) :: map()
Expand Down Expand Up @@ -63,11 +60,10 @@ defmodule Elasticlunr.Index.Supervisor do
@impl true
def init(%Schema{compaction_strategy: compaction} = schema) do
dir = create_dir!(schema)
mem_table_max_size = Application.get_env(@otp_app, :mem_table_max_size, @mem_table_max_size)

children = [
{Compaction, dir: dir, schema: schema, strategy: compaction},
{Writer, dir: dir, schema: schema, mem_table_max_size: mem_table_max_size},
{Writer, dir: dir, schema: schema},
{Reader, dir: dir, schema: schema}
]

Expand Down
42 changes: 24 additions & 18 deletions lib/elasticlunr/index/writer.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule Elasticlunr.Index.Writer do
alias Elasticlunr.Manifest.Changes
alias Elasticlunr.MemTable
alias Elasticlunr.MemTable.Entry, as: MemTableEntry
alias Elasticlunr.Options
alias Elasticlunr.Schema
alias Elasticlunr.SSTable
alias Elasticlunr.Utils
Expand All @@ -17,23 +18,21 @@ defmodule Elasticlunr.Index.Writer do

require Logger

defstruct [:dir, :schema, :wal, :mem_table, :mt_max_size, :manifest]
defstruct [:dir, :schema, :wal, :mem_table, :manifest]

@type t :: %__MODULE__{
wal: nil | Wal.t(),
dir: Path.t(),
schema: Schema.t(),
mt_max_size: pos_integer(),
manifest: nil | Manifest.t(),
mem_table: nil | MemTable.t()
}

@spec new(Path.t(), Schema.t(), pos_integer()) :: t()
def new(dir, schema, mt_max_size) do
@spec new(Path.t(), Schema.t()) :: t()
def new(dir, schema) do
attrs = [
dir: dir,
schema: schema,
mt_max_size: mt_max_size
schema: schema
]

struct!(__MODULE__, attrs)
Expand Down Expand Up @@ -128,16 +127,16 @@ defmodule Elasticlunr.Index.Writer do
end

defp recover_from_logs(
%{dir: dir, log_files: log_files, manifest: manifest, writer: writer} = state
%{dir: dir, log_files: log_files, manifest: manifest, options: options} = state
) do
mergeable_fields = [:manifest, :mem_table, :compactions, :last_log_number]

params = %{
dir: dir,
manifest: manifest,
mem_table: MemTable.new(),
mt_max_size: writer.mt_max_size,
last_log_number: List.last(log_files)
last_log_number: List.last(log_files),
max_buffer_size: options.max_buffer_size
}

mark_file_number = fn %{manifest: manifest, log_number: log_number} = params ->
Expand Down Expand Up @@ -198,10 +197,10 @@ defmodule Elasticlunr.Index.Writer do
|> Filename.log(log_number)
|> Iterator.new!()
|> Enum.reduce_while(params, fn entry, acc ->
%{mem_table: mt, compactions: c, manifest: manifest, mt_max_size: mms} = acc
%{mem_table: mt, compactions: c, manifest: manifest, max_buffer_size: mbs} = acc
mt = update_mt.(mt, entry)

with {true, mt} <- {MemTable.size(mt) >= mms, mt},
with {true, mt} <- {MemTable.size(mt) >= mbs, mt},
{:ok, manifest} <- flush_mt.(mt, dir, manifest) do
{:cont, %{acc | mem_table: MemTable.new(), manifest: manifest, compactions: c + 1}}
else
Expand Down Expand Up @@ -276,22 +275,24 @@ defmodule Elasticlunr.Index.Writer do
end

defp create_db_if_missing(%{dir: dir} = writer) do
options = options(writer)
path = Filename.current(dir)
state = %{dir: dir, options: options, writer: writer}

with false <- File.exists?(path),
:ok <- new_db(dir) do
{:ok, %{dir: dir, writer: writer}}
:ok <- new_db(dir, options) do
{:ok, state}
else
true -> {:ok, %{dir: dir, writer: writer}}
true -> {:ok, state}
error -> error
end
end

defp new_db(dir) do
defp new_db(dir, options) do
path = Filename.manifest(dir, 1)

with :ok <- File.touch(path),
manifest = Manifest.new(1, dir),
manifest = Manifest.new(1, dir, options),
changes = Changes.set_next_file_number(2),
{:ok, manifest} <- Manifest.apply_and_log(manifest, changes),
:ok <- Manifest.close(manifest) do
Expand Down Expand Up @@ -319,9 +320,14 @@ defmodule Elasticlunr.Index.Writer do
end
end

@spec options(t()) :: Options.t()
def options(%__MODULE__{schema: schema}), do: schema.options

@spec buffer_filled?(t()) :: boolean()
def buffer_filled?(%__MODULE__{mem_table: mem_table, mt_max_size: mt_max_size}) do
MemTable.size(mem_table) >= mt_max_size
def buffer_filled?(%__MODULE__{mem_table: mem_table} = writer) do
%Options{max_buffer_size: max_size} = options(writer)

MemTable.size(mem_table) >= max_size
end

@spec close(t()) :: :ok | no_return()
Expand Down
106 changes: 81 additions & 25 deletions lib/elasticlunr/manifest.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,47 @@ defmodule Elasticlunr.Manifest do
alias Elasticlunr.FileMeta
alias Elasticlunr.Filename
alias Elasticlunr.Manifest.Changes
alias Elasticlunr.Options

use Rop

defstruct [
:fd,
:number,
:log_number,
:next_file_number,
:files,
:versions
]
@fields ~w[fd l0_compaction_trigger max_bytes_for_base_level max_bytes_for_level_multiplier max_level number]a

@enforce_keys @fields
defstruct @fields ++
[
log_number: 0,
next_file_number: 0,
files: %{},
compaction_score: {-1, -1}
]

@type t :: %__MODULE__{
fd: File.io_device(),
number: non_neg_integer(),
max_level: non_neg_integer(),
log_number: non_neg_integer(),
next_file_number: non_neg_integer(),
l0_compaction_trigger: pos_integer(),
compaction_score: {float(), integer()},
max_bytes_for_base_level: pos_integer(),
max_bytes_for_level_multiplier: pos_integer(),
files: %{non_neg_integer() => [FileMeta.t()]}
}

@opts [:append, :binary]

@spec new(pos_integer(), Path.t()) :: t()
def new(number, dir) do
@spec new(pos_integer(), Path.t(), Options.t()) :: t()
def new(number, dir, options \\ %Options{}) do
path = Filename.manifest(dir, number)

attrs = %{
log_number: 0,
next_file_number: 0,
files: %{},
versions: [],
number: number,
fd: File.open!(path, @opts)
fd: File.open!(path, @opts),
max_level: options.max_level,
l0_compaction_trigger: options.l0_compaction_trigger,
max_bytes_for_base_level: options.max_bytes_for_base_level,
max_bytes_for_level_multiplier: options.max_bytes_for_level_multiplier
}

struct!(__MODULE__, attrs)
Expand Down Expand Up @@ -69,6 +77,9 @@ defmodule Elasticlunr.Manifest do
do_apply(manifest, changes) >>> log_changes()
end

@spec needs_compaction?(t()) :: boolean()
def needs_compaction?(%__MODULE__{compaction_score: {score, _level}}), do: score >= 1

@spec known_files(t()) :: MapSet.t(pos_integer())
def known_files(%__MODULE__{files: files}) do
Enum.reduce(files, MapSet.new(), fn {_level, files}, set ->
Expand Down Expand Up @@ -106,17 +117,63 @@ defmodule Elasticlunr.Manifest do
defp do_apply(%__MODULE__{} = manifest, %Changes{} = changes) do
set_next_file_number(%{changes: changes, manifest: manifest})
|> validate_or_set_log_number() >>>
merge_files()
merge_files() >>>
compute_compaction_score()
end

defp level_files(files, level), do: Map.get(files, level, [])

defp compute_compaction_score(%{manifest: manifest} = params) do
score_fn = fn
files, 0 = level ->
files
|> level_files(level)
|> Enum.count()
|> Kernel./(manifest.l0_compaction_trigger)

files, level ->
max_bytes_for_level =
level_max_bytes(
level,
manifest.max_bytes_for_base_level,
manifest.max_bytes_for_level_multiplier
)

files
|> level_files(level)
|> total_file_size()
|> Kernel./(max_bytes_for_level)
end

{_, best_score, best_level} =
Enum.reduce(
0..manifest.max_level,
{manifest.files, -1, -1},
fn level, {files, best_score, _best_level} = acc ->
score = score_fn.(files, level)

case score > best_score do
false -> acc
true -> {files, score, level}
end
end
)

{:ok, %{params | manifest: %{manifest | compaction_score: {best_score, best_level}}}}
end

defp total_file_size(files), do: Enum.reduce(files, 0, &(&1.size + &2))

defp level_max_bytes(level, max_size, multiplier), do: max_size * multiplier ** level

defp merge_files(%{changes: changes, manifest: manifest} = params) do
%__MODULE__{files: files} = manifest
%Changes{new_files: new_files} = changes

files =
Enum.reduce(new_files, files, fn {level, file}, files ->
files
|> Map.get(level, [])
|> level_files(level)
|> then(&([file] ++ &1))
|> then(&Map.put(files, level, &1))
end)
Expand All @@ -125,14 +182,13 @@ defmodule Elasticlunr.Manifest do
end

defp log_changes(%{changes: changes, manifest: manifest}) do
changes
|> Changes.encode()
|> then(&[IO.iodata_length(&1), &1])
|> then(&IO.binwrite(manifest.fd, &1))
|> case do
:ok -> {:ok, manifest}
error -> error
end
:ok =
changes
|> Changes.encode()
|> then(&[IO.iodata_length(&1), &1])
|> then(&IO.binwrite(manifest.fd, &1))

{:ok, manifest}
end

defp set_next_file_number(
Expand Down
Loading

0 comments on commit 94321f4

Please sign in to comment.