Skip to content

Commit

Permalink
Stop existing modules when iotedged starts (#3299)
Browse files Browse the repository at this point in the history
Normally iotedged will stop all modules when it shuts down. But if it crashed, modules will continue to run. On Linux systems where iotedged is responsible for creating/binding the socket (e.g., CentOS 7.5, which uses systemd but does not support systemd socket activation), modules will be left holding stale file descriptors for the workload and management APIs and calls on these APIs will begin to fail. It is expected that modules will be resilient to these sorts of failures, but in reality sometimes they aren't.

This change updates iotedged to stop any existing modules when it is starting. The stopped modules will be started again naturally once iotedged (and Edge Agent) are running again.
  • Loading branch information
damonbarry authored Jul 29, 2020
1 parent e9dc6c1 commit 2d1a609
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 0 deletions.
2 changes: 2 additions & 0 deletions edgelet/edgelet-core/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,7 @@ pub trait ModuleRuntime: Sized {
type SystemInfoFuture: Future<Item = SystemInfo, Error = Self::Error> + Send;
type SystemResourcesFuture: Future<Item = SystemResources, Error = Self::Error> + Send;
type RemoveAllFuture: Future<Item = (), Error = Self::Error> + Send;
type StopAllFuture: Future<Item = (), Error = Self::Error> + Send;

fn create(&self, module: ModuleSpec<Self::Config>) -> Self::CreateFuture;
fn get(&self, id: &str) -> Self::GetFuture;
Expand All @@ -515,6 +516,7 @@ pub trait ModuleRuntime: Sized {
fn logs(&self, id: &str, options: &LogOptions) -> Self::LogsFuture;
fn registry(&self) -> &Self::ModuleRegistry;
fn remove_all(&self) -> Self::RemoveAllFuture;
fn stop_all(&self, wait_before_kill: Option<Duration>) -> Self::StopAllFuture;
}

#[derive(Clone, Copy, Debug)]
Expand Down
26 changes: 26 additions & 0 deletions edgelet/edgelet-docker/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ impl ModuleRuntime for DockerModuleRuntime {
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type StopAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;

fn create(&self, module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
info!("Creating module {}...", module.name());
Expand Down Expand Up @@ -823,6 +824,26 @@ impl ModuleRuntime for DockerModuleRuntime {
future::join_all(n).map(|_| ())
}))
}

fn stop_all(&self, wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
let self_for_stop = self.clone();
Box::new(self.list().and_then(move |list| {
let n = list.into_iter().map(move |c| {
<DockerModuleRuntime as ModuleRuntime>::stop(
&self_for_stop,
c.name(),
wait_before_kill,
)
.or_else(|err| {
match Fail::find_root_cause(&err).downcast_ref::<ErrorKind>() {
Some(ErrorKind::NotFound(_)) | Some(ErrorKind::NotModified) => Ok(()),
_ => Err(err),
}
})
});
future::join_all(n).map(|_| ())
}))
}
}

impl Authenticator for DockerModuleRuntime {
Expand Down Expand Up @@ -1479,6 +1500,7 @@ mod tests {
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = FutureResult<(), Self::Error>;
type StopAllFuture = FutureResult<(), Self::Error>;

fn create(&self, _module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
unimplemented!()
Expand Down Expand Up @@ -1531,6 +1553,10 @@ mod tests {
fn remove_all(&self) -> Self::RemoveAllFuture {
unimplemented!()
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
unimplemented!()
}
}

impl Authenticator for TestModuleList {
Expand Down
11 changes: 11 additions & 0 deletions edgelet/edgelet-http-mgmt/src/client/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl ModuleRuntime for ModuleClient {
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type StopAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;

fn create(&self, _module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
unimplemented!()
Expand Down Expand Up @@ -346,6 +347,16 @@ impl ModuleRuntime for ModuleClient {
future::join_all(n).map(|_| ())
}))
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
let self_for_stop = self.clone();
Box::new(self.list().and_then(move |list| {
let n = list
.into_iter()
.map(move |c| <Self as ModuleRuntime>::stop(&self_for_stop, c.name(), None));
future::join_all(n).map(|_| ())
}))
}
}

pub struct Logs(String, Body);
Expand Down
5 changes: 5 additions & 0 deletions edgelet/edgelet-kube/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ where
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type StopAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;

fn create(&self, module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
Box::new(create_module(self, module))
Expand Down Expand Up @@ -303,6 +304,10 @@ where
fn remove_all(&self) -> Self::RemoveAllFuture {
Box::new(future::ok(()))
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
Box::new(future::ok(()))
}
}

impl<T, S> Authenticator for KubeModuleRuntime<T, S>
Expand Down
5 changes: 5 additions & 0 deletions edgelet/edgelet-test-utils/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ where
type SystemInfoFuture = FutureResult<SystemInfo, Self::Error>;
type SystemResourcesFuture = FutureResult<SystemResources, Self::Error>;
type RemoveAllFuture = FutureResult<(), Self::Error>;
type StopAllFuture = FutureResult<(), Self::Error>;

fn create(&self, _module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
match self.module.as_ref().unwrap() {
Expand Down Expand Up @@ -449,4 +450,8 @@ where
fn remove_all(&self) -> Self::RemoveAllFuture {
future::ok(())
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
future::ok(())
}
}
5 changes: 5 additions & 0 deletions edgelet/iotedged/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub enum InitializeErrorReason {
#[cfg(windows)]
RegisterWindowsService,
RemoveExistingModules,
StopExistingModules,
SaveSettings,
#[cfg(windows)]
StartWindowsService,
Expand Down Expand Up @@ -341,6 +342,10 @@ impl fmt::Display for InitializeErrorReason {
write!(f, "Could not remove existing modules")
}

InitializeErrorReason::StopExistingModules => {
write!(f, "Could not stop existing modules")
}

InitializeErrorReason::SaveSettings => write!(f, "Could not save settings file"),

#[cfg(windows)]
Expand Down
17 changes: 17 additions & 0 deletions edgelet/iotedged/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::fs::{DirBuilder, File, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use failure::{Context, Fail, ResultExt};
use futures::future::{Either, IntoFuture};
Expand Down Expand Up @@ -328,6 +329,22 @@ where
crypto.clone(),
)?;

// Normally iotedged will stop all modules when it shuts down. But if it crashed,
// modules will continue to run. On Linux systems where iotedged is responsible for
// creating/binding the socket (e.g., CentOS 7.5, which uses systemd but does not
// support systemd socket activation), modules will be left holding stale file
// descriptors for the workload and management APIs and calls on these APIs will
// begin to fail. Resilient modules should be able to deal with this, but we'll
// restart all modules to ensure a clean start.
const STOP_TIME: Duration = Duration::from_secs(30);
info!("Stopping all modules...");
tokio_runtime
.block_on(runtime.stop_all(Some(STOP_TIME)))
.context(ErrorKind::Initialize(
InitializeErrorReason::StopExistingModules
))?;
info!("Finished stopping modules.");

if $force_reprovision ||
($provisioning_result.reconfigure() != ReprovisioningStatus::DeviceDataNotUpdated) {
// If this device was re-provisioned and the device key was updated it causes
Expand Down

0 comments on commit 2d1a609

Please sign in to comment.