Skip to content

Commit

Permalink
Switch to custom circular buffer implementation
Browse files Browse the repository at this point in the history
This builds off in progress updates to the `circular_buffer` library to
optimize it for the circular buffer case. This update reduces the work
involved with storing log records. Currently other parts of RingLogger
dominate in timings so the improvement from this update can only be seen
if carefully monitoring the amount of memory used.
  • Loading branch information
fhunleth committed Jul 29, 2020
1 parent 325f684 commit bef2fb7
Show file tree
Hide file tree
Showing 2 changed files with 189 additions and 65 deletions.
140 changes: 140 additions & 0 deletions lib/ring_logger/circular_buffer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
defmodule RingLogger.CircularBuffer do
@moduledoc """
Circular Buffer
This is a modified version of https://github.com/keathley/circular_buffer
that doesn't use `:queue`. It creates less garbage than the `:queue` version
and is slightly faster in trivial benchmarks. RingLogger currently has other
limitations that make it hard to see these improvements.
When creating a circular buffer you must specify the max size:
```
cb = CircularBuffer.new(10)
```
"""

# How does this work?
#
# There are two lists, `a` and `b`. New items are placed into list `a`. Old
# items are removed from list `b`.
#
# List `a` is ordered from newest to oldest, and list `b` is ordered from
# oldest to newest. Everything in list `a` is newer than list `b`.
#
# When the circular buffer is full, the normal case, inserting an
# item involves prepending it to `a` and removing the first item
# in list `b`. The list ordering makes these both O(1).
#
# When no more items can be removed from list `b`, list `a` is
# reversed and becomes the new list `b`.
#
# The functions for getting the oldest and newest items are also
# fast: The oldest item is the head of list `b`. The newest item
# is the head of list `a`.

defstruct [:a, :b, :max_size, :count]

alias __MODULE__, as: CB

@doc """
Creates a new circular buffer with a given size.
"""
def new(size) when is_integer(size) and size > 0 do
%CB{a: [], b: [], max_size: size, count: 0}
end

@doc """
Inserts a new item into the next location of the circular buffer
Amortized run time: O(1)
Worst case run time: O(n)
"""
def insert(%CB{b: b} = cb, item) when b != [] do
%CB{cb | a: [item | cb.a], b: tl(b)}
end

def insert(%CB{count: count, max_size: max_size} = cb, item) when count < max_size do
%CB{cb | a: [item | cb.a], count: cb.count + 1}
end

def insert(%CB{b: []} = cb, item) do
new_b = cb.a |> Enum.reverse() |> tl()
%CB{cb | a: [item], b: new_b}
end

@doc """
Converts a circular buffer to a list.
The list is ordered from oldest to newest elements based on their insertion
order.
Worst case run time: O(n)
"""
def to_list(%CB{} = cb) do
cb.b ++ Enum.reverse(cb.a)
end

@doc """
Returns the newest element in the buffer
Runs in constant time.
"""
def newest(%CB{a: [newest | _rest]}), do: newest
def newest(%CB{b: []}), do: nil

@doc """
Returns the oldest element in the buffer
Mostly runs in constant time. Worst case O(n).
"""
def oldest(%CB{b: [oldest | _rest]}), do: oldest
def oldest(%CB{a: a}), do: List.last(a)

@doc """
Checks the buffer to see if its empty
Runs in constant time
"""
def empty?(%CB{} = cb) do
cb.count == 0
end

defimpl Enumerable do
def count(cb) do
{:ok, cb.count}
end

def member?(cb, element) do
{:ok, Enum.member?(cb.a, element) or Enum.member?(cb.b, element)}
end

def reduce(cb, acc, fun) do
Enumerable.List.reduce(CB.to_list(cb), acc, fun)
end

def slice(_cb) do
{:error, __MODULE__}
end
end

defimpl Collectable do
def into(original) do
collector_fn = fn
cb, {:cont, elem} -> CB.insert(cb, elem)
cb, :done -> cb
_cb, :halt -> :ok
end

{original, collector_fn}
end
end

defimpl Inspect do
import Inspect.Algebra

def inspect(cb, opts) do
concat(["#CircularBuffer<", to_doc(CB.to_list(cb), opts), ">"])
end
end
end
114 changes: 49 additions & 65 deletions lib/ring_logger/server.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,15 @@ defmodule RingLogger.Server do
use GenServer

@moduledoc false
@default_max_size 1024

alias RingLogger.Client

@opts [:max_size]
alias RingLogger.{CircularBuffer, Client}

defmodule State do
@moduledoc false

@default_max_size 1024

defstruct clients: [],
buffer: :queue.new(),
size: 0,
max_size: @default_max_size,
cb: nil,
index: 0
end

Expand Down Expand Up @@ -77,24 +72,32 @@ defmodule RingLogger.Server do

@impl GenServer
def init(opts) do
{:ok, merge_opts(opts, %State{})}
max_size = Keyword.get(opts, :max_size, @default_max_size)

{:ok, %State{cb: CircularBuffer.new(max_size)}}
end

@impl GenServer
def handle_call(:clear, _from, state) do
{:reply, :ok, %{state | buffer: :queue.new(), size: 0, index: state.index + state.size}}
max_size = state.cb.max_size

{:reply, :ok, %{state | cb: CircularBuffer.new(max_size)}}
end

def handle_call(:config, _from, state) do
config =
Map.take(state, @opts)
|> Map.to_list()
config = %{max_size: state.cb.max_size}

{:reply, config, state}
end

def handle_call({:configure, opts}, _from, state) do
{:reply, :ok, merge_opts(opts, state)}
case Keyword.get(opts, :max_size) do
nil ->
{:reply, :ok, state}

max_size ->
{:reply, :ok, %State{state | cb: CircularBuffer.new(max_size)}}
end
end

def handle_call({:attach, client_pid}, _from, state) do
Expand All @@ -105,29 +108,30 @@ defmodule RingLogger.Server do
{:reply, :ok, detach_client(pid, state)}
end

def handle_call({:get, start_index, n}, _from, state) do
resp =
cond do
start_index <= state.index ->
:queue.to_list(state.buffer)
def handle_call({:get, start_index, 0}, _from, state) do
first_index = state.index - Enum.count(state.cb)
adjusted_start_index = max(start_index - first_index, 0)
items = Enum.drop(state.cb, adjusted_start_index)

start_index >= state.index + state.size ->
[]
{:reply, items, state}
end

def handle_call({:get, start_index, n}, _from, state) do
first_index = state.index - Enum.count(state.cb)
last_index = state.index

true ->
{_, buffer_range} = :queue.split(start_index - state.index, state.buffer)
:queue.to_list(buffer_range)
end
{adjusted_start_index, adjusted_n} =
{start_index, n}
|> adjust_left(first_index)
|> adjust_right(last_index)

paged_resp = if n <= 0, do: resp, else: Enum.take(resp, n)
items = Enum.slice(state.cb, adjusted_start_index, adjusted_n)

{:reply, paged_resp, state}
{:reply, items, state}
end

def handle_call({:tail, n}, _from, state) do
start = max(0, state.size - n)
{_, last_n} = :queue.split(start, state.buffer)
{:reply, :queue.to_list(last_n), state}
{:reply, Enum.take(state.cb, -n), state}
end

@impl GenServer
Expand All @@ -146,6 +150,18 @@ defmodule RingLogger.Server do
:ok
end

defp adjust_left({offset, n}, i) when i > offset do
{i, max(0, n - (i - offset))}
end

defp adjust_left(loc, _i), do: loc

defp adjust_right({offset, n}, i) when i < offset + n do
{offset, i - offset}
end

defp adjust_right(loc, _i), do: loc

defp attach_client(client_pid, state) do
if !client_info(client_pid, state) do
ref = Process.monitor(client_pid)
Expand All @@ -172,46 +188,14 @@ defmodule RingLogger.Server do
List.keyfind(state.clients, client_pid, 0)
end

defp merge_opts(opts, state) do
opts =
opts
|> Keyword.take(@opts)
|> Enum.into(%{})

state
|> Map.merge(opts)
|> trim
end

defp trim(%{max_size: max_size, size: size, buffer: buffer} = state)
when size > max_size do
trim = max_size - size

buffer = Enum.reduce(1..trim, buffer, fn _, buf -> :queue.drop(buf) end)

%{state | buffer: buffer, size: size}
end

defp trim(state), do: state

defp push(level, {mod, msg, ts, md}, state) do
index = state.index + state.size
index = state.index
log_entry = {level, {mod, msg, ts, Keyword.put(md, :index, index)}}

Enum.each(state.clients, &send_log(&1, log_entry))

ring_insert(state, log_entry)
end

defp ring_insert(state, item) do
if state.size == state.max_size do
buffer = :queue.drop(state.buffer)
buffer = :queue.in(item, buffer)
%{state | buffer: buffer, index: state.index + 1}
else
buffer = :queue.in(item, state.buffer)
%{state | buffer: buffer, size: state.size + 1}
end
new_cb = CircularBuffer.insert(state.cb, log_entry)
%{state | cb: new_cb, index: index + 1}
end

defp send_log({client_pid, _ref}, log_entry) do
Expand Down

0 comments on commit bef2fb7

Please sign in to comment.