Skip to content

Commit

Permalink
Device channel cleanups (#1546)
Browse files Browse the repository at this point in the history
  • Loading branch information
joshk authored Sep 25, 2024
1 parent 72be11f commit 45e38c4
Show file tree
Hide file tree
Showing 2 changed files with 102 additions and 106 deletions.
31 changes: 31 additions & 0 deletions lib/nerves_hub/audit_logs/templates.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule NervesHub.AuditLogs.Templates do
alias NervesHub.AuditLogs

def audit_resolve_changed_deployment(device, reference_id) do
description =
if device.deployment_id do
"device #{device.identifier} reloaded deployment and is attached to deployment #{device.deployment.name}"
else
"device #{device.identifier} reloaded deployment and is no longer attached to a deployment"
end

AuditLogs.audit_with_ref!(device, device, description, reference_id)
end

def audit_device_deployment_update_triggered(device, reference_id) do
deployment = device.deployment
firmware = deployment.firmware

description =
"deployment #{deployment.name} update triggered device #{device.identifier} to update firmware #{firmware.uuid}"

AuditLogs.audit_with_ref!(deployment, device, description, reference_id)
end

def audit_device_assigned(device, reference_id) do
description =
"device #{device.identifier} reloaded deployment and is attached to deployment #{device.deployment.name}"

AuditLogs.audit_with_ref!(device, device, description, reference_id)
end
end
177 changes: 71 additions & 106 deletions lib/nerves_hub_web/channels/device_channel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ defmodule NervesHubWeb.DeviceChannel do
require Logger

alias NervesHub.Archives
alias NervesHub.AuditLogs
alias NervesHub.AuditLogs.Templates
alias NervesHub.Deployments
alias NervesHub.Devices
alias NervesHub.Devices.Device
Expand Down Expand Up @@ -39,24 +39,13 @@ defmodule NervesHubWeb.DeviceChannel do
|> Repo.preload(:org)
|> deployment_preload()

if params["fwup_public_keys"] == "on_connect" do
send_public_keys(device, socket, "fwup_public_keys")
end

if params["archive_public_keys"] == "on_connect" do
send_public_keys(device, socket, "archive_public_keys")
end
maybe_send_public_keys(device, socket, params)

# clear out any inflight updates, there shouldn't be one at this point
# we might make a new one right below it, so clear it beforehand
Devices.clear_inflight_update(device)

deployment_channel =
if device.deployment_id do
"deployment:#{device.deployment_id}"
else
"deployment:none"
end
deployment_channel = deployment_channel(device)

subscribe("device:#{device.id}")
subscribe(deployment_channel)
Expand Down Expand Up @@ -105,11 +94,7 @@ defmodule NervesHubWeb.DeviceChannel do

case Registry.register(NervesHub.Devices.Registry, device.id, payload) do
{:error, {:already_registered, _}} ->
if timer = socket.assigns[:registration_timer], do: Process.cancel_timer(timer)

timer = Process.send_after(self(), {:register, attempt + 1}, 500)

{:noreply, assign(socket, registration_timer: timer)}
{:noreply, retry_device_registration(socket, attempt)}

_ ->
{:noreply, assign(socket, :registered?, true)}
Expand Down Expand Up @@ -168,19 +153,7 @@ defmodule NervesHubWeb.DeviceChannel do
|> Deployments.set_deployment()
|> deployment_preload()

description =
if device.deployment_id do
"device #{device.identifier} reloaded deployment and is attached to deployment #{device.deployment.name}"
else
"device #{device.identifier} reloaded deployment and is no longer attached to a deployment"
end

AuditLogs.audit_with_ref!(
device,
device,
description,
socket.assigns.reference_id
)
Templates.audit_resolve_changed_deployment(device, socket.assigns.reference_id)

maybe_update_registry(socket, device, %{deployment_id: device.deployment_id})

Expand Down Expand Up @@ -215,21 +188,10 @@ defmodule NervesHubWeb.DeviceChannel do
firmware_uuid: inflight_update.firmware_uuid
})

deployment = device.deployment
firmware = deployment.firmware

description =
"deployment #{deployment.name} update triggered device #{device.identifier} to update firmware #{firmware.uuid}"

# If we get here, the device is connected and high probability it receives
# the update message so we can Audit and later assert on this audit event
# as a loosely valid attempt to update
AuditLogs.audit_with_ref!(
deployment,
device,
description,
socket.assigns.reference_id
)
Templates.audit_device_deployment_update_triggered(device, socket.assigns.reference_id)

Devices.update_started!(inflight_update)
push(socket, "update", payload)
Expand Down Expand Up @@ -374,14 +336,7 @@ defmodule NervesHubWeb.DeviceChannel do
end

def handle_in("fwup_progress", %{"value" => percent}, %{assigns: %{device: device}} = socket) do
NervesHubWeb.DeviceEndpoint.broadcast_from!(
self(),
"device:#{device.identifier}:internal",
"fwup_progress",
%{
percent: percent
}
)
device_internal_broadcast!(device, "fwup_progress", %{percent: percent})

# if this is the first fwup we see, then mark it as an update attempt
if socket.assigns[:update_started?] do
Expand Down Expand Up @@ -410,12 +365,7 @@ defmodule NervesHubWeb.DeviceChannel do

{:ok, device} = Devices.update_device(device, %{connection_metadata: metadata})

_ =
NervesHubWeb.DeviceEndpoint.broadcast(
"device:#{device.identifier}:internal",
"location:updated",
location
)
device_internal_broadcast!(device, "location:updated", location)

{:reply, :ok, assign(socket, :device, device)}
end
Expand Down Expand Up @@ -478,12 +428,7 @@ defmodule NervesHubWeb.DeviceChannel do
{:health_report, Devices.save_device_health(device_health)},
{:metrics_report, {:ok, _}} <-
{:metrics_report, Metrics.save_metrics(socket.assigns.device.id, metrics)} do
NervesHubWeb.DeviceEndpoint.broadcast_from!(
self(),
"device:#{socket.assigns.device.identifier}:internal",
"health_check_report",
%{}
)
device_internal_broadcast!(socket.assigns.device, "health_check_report", %{})
else
{:health_report, {:error, err}} ->
Logger.warning("Failed to save health check data: #{inspect(err)}")
Expand Down Expand Up @@ -514,13 +459,22 @@ defmodule NervesHubWeb.DeviceChannel do
assign(socket, :device_api_version, Map.get(params, "device_api_version", "1.0.0"))
end

defp retry_device_registration(socket, attempt) do
_ = if timer = socket.assigns[:registration_timer], do: Process.cancel_timer(timer)
timer = Process.send_after(self(), {:register, attempt + 1}, 500)
assign(socket, registration_timer: timer)
end

defp maybe_update_registry(socket, device, updates) do
if socket.assigns[:registered?] do
{_, _} =
Registry.update_value(NervesHub.Devices.Registry, device.id, fn value ->
Map.merge(value, updates)
end)
end
_ =
if socket.assigns[:registered?] do
{_, _} =
Registry.update_value(NervesHub.Devices.Registry, device.id, fn value ->
Map.merge(value, updates)
end)
end

:ok
end

defp log_to_sentry(device, message, extra \\ %{}) do
Expand All @@ -531,11 +485,9 @@ defmodule NervesHubWeb.DeviceChannel do
org_id: device.org_id
})

_ =
Sentry.capture_message(message,
extra: extra,
result: :none
)
_ = Sentry.capture_message(message, extra: extra, result: :none)

:ok
end

defp subscribe(topic) do
Expand All @@ -547,12 +499,21 @@ defmodule NervesHubWeb.DeviceChannel do
Phoenix.PubSub.unsubscribe(NervesHub.PubSub, topic)
end

defp send_public_keys(device, socket, key_type) do
org_keys = NervesHub.Accounts.list_org_keys(device.org)
defp device_internal_broadcast!(device, event, payload) do
topic = "device:#{device.identifier}:internal"
NervesHubWeb.DeviceEndpoint.broadcast_from!(self(), topic, event, payload)
end

defp maybe_send_public_keys(device, socket, params) do
Enum.each(["fwup_public_keys", "archive_public_keys"], fn key_type ->
if params[key_type] == "on_connect" do
org_keys = NervesHub.Accounts.list_org_keys(device.org)

push(socket, key_type, %{
keys: Enum.map(org_keys, fn ok -> ok.key end)
})
push(socket, key_type, %{
keys: Enum.map(org_keys, fn ok -> ok.key end)
})
end
end)
end

defp cancel_deployment_timer(%{assigns: %{assign_deployment_timer: timer}}) do
Expand Down Expand Up @@ -615,10 +576,7 @@ defmodule NervesHubWeb.DeviceChannel do
|> Repo.update!()
|> deployment_preload()

description =
"device #{device.identifier} reloaded deployment and is attached to deployment #{device.deployment.name}"

AuditLogs.audit_with_ref!(device, device, description, socket.assigns.reference_id)
Templates.audit_device_assigned(device, socket.assigns.reference_id)

maybe_update_registry(socket, device, %{deployment_id: device.deployment_id})

Expand All @@ -634,12 +592,7 @@ defmodule NervesHubWeb.DeviceChannel do
end

defp update_deployment_subscription(socket, device) do
deployment_channel =
if device.deployment_id do
"deployment:#{device.deployment_id}"
else
"deployment:none"
end
deployment_channel = deployment_channel(device)

if deployment_channel != socket.assigns.deployment_channel do
unsubscribe(socket.assigns.deployment_channel)
Expand All @@ -653,23 +606,11 @@ defmodule NervesHubWeb.DeviceChannel do
end
end

defp device_deployment_change_jitter_ms() do
jitter = Application.get_env(:nerves_hub, :device_deployment_change_jitter_seconds)

if jitter > 0 do
:rand.uniform(jitter) * 1000
else
0
end
end

defp schedule_health_check() do
if device_health_check_enabled?() do
interval = Application.get_env(:nerves_hub, :device_health_check_interval_minutes)
Process.send_after(self(), :health_check, :timer.minutes(interval))
:ok
defp deployment_channel(device) do
if device.deployment_id do
"deployment:#{device.deployment_id}"
else
:ok
"deployment:none"
end
end

Expand Down Expand Up @@ -703,7 +644,31 @@ defmodule NervesHubWeb.DeviceChannel do
socket
end

defp schedule_health_check() do
if device_health_check_enabled?() do
Process.send_after(self(), :health_check, device_health_check_interval())
:ok
else
:ok
end
end

defp device_health_check_enabled?() do
Application.get_env(:nerves_hub, :device_health_check_enabled)
end

defp device_health_check_interval() do
Application.get_env(:nerves_hub, :device_health_check_interval_minutes)
|> :timer.minutes()
end

defp device_deployment_change_jitter_ms() do
jitter = Application.get_env(:nerves_hub, :device_deployment_change_jitter_seconds)

if jitter > 0 do
:rand.uniform(jitter) * 1000
else
0
end
end
end

0 comments on commit 45e38c4

Please sign in to comment.