Skip to content
This repository has been archived by the owner on Jul 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #4307 from Aircloak/edon/fix/jobs
Browse files Browse the repository at this point in the history
Limit shadow db manager concurrency using locks
  • Loading branch information
edongashi authored May 6, 2020
2 parents be241a2 + 0ff8deb commit 2e20930
Showing 1 changed file with 24 additions and 9 deletions.
33 changes: 24 additions & 9 deletions air/lib/air/psql_server/shadow_db/manager.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ defmodule Air.PsqlServer.ShadowDb.Manager do
@doc "Drops the given shadow database."
@spec drop_database(User.t(), String.t()) :: :ok
def drop_database(user, data_source_name) do
run_and_measure(fn ->
run_queued("drop_database (#{data_source_name}/#{user.id})", fn ->
Connection.execute!(
Air.PsqlServer.ShadowDb.connection_params().name,
fn conn ->
Expand Down Expand Up @@ -79,14 +79,23 @@ defmodule Air.PsqlServer.ShadowDb.Manager do

@impl GenServer
def handle_cast(:ensure_exists, state) do
run_and_measure(fn -> ensure_db!(state.user, state.data_source_name) end)
run_queued("ensure_db (#{state.data_source_name}/#{state.user.id})", fn ->
ensure_db!(state.user, state.data_source_name)
end)

{:noreply, state}
end

@impl GenServer
def handle_cast(:update_definition, state) do
tables = data_source_tables(state.user, state.data_source_name)
if state.tables != tables, do: run_and_measure(fn -> update_shadow_db(state, tables) end)

if state.tables != tables do
run_queued("update_shadow_db (#{state.data_source_name}/#{state.user.id})", fn ->
update_shadow_db(state, tables)
end)
end

{:noreply, %{state | tables: tables}}
end

Expand All @@ -97,15 +106,21 @@ defmodule Air.PsqlServer.ShadowDb.Manager do
# Internal functions
# -------------------------------------------------------------------

@lock_concurrency 10
@slow_update_duration 500

defp run_and_measure(fun) do
{time, _} = :timer.tc(fun)
ms = time / 1000
defp run_queued(name, fun) do
:global.trans(
{{:shadow_db_update, :rand.uniform(@lock_concurrency)}, self()},
fn ->
{time, _} = :timer.tc(fun)
ms = time / 1000

if ms > @slow_update_duration do
Logger.warn("Updating the shadow database took #{ms}ms")
end
if ms > @slow_update_duration do
Logger.warn("Shadow database operation #{name} took #{ms}ms")
end
end
)
end

defp name(user, data_source_name), do: Air.PsqlServer.ShadowDb.registered_name(user, data_source_name, __MODULE__)
Expand Down

0 comments on commit 2e20930

Please sign in to comment.