Skip to content

Commit

Permalink
Stop existing modules when iotedged starts (#3377)
Browse files Browse the repository at this point in the history
Cherry-picked from master 2d1a609
  • Loading branch information
damonbarry authored Aug 7, 2020
1 parent 25229b2 commit 7066164
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 0 deletions.
2 changes: 2 additions & 0 deletions edgelet/edgelet-core/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ pub trait ModuleRuntime: Sized {
type SystemInfoFuture: Future<Item = SystemInfo, Error = Self::Error> + Send;
type SystemResourcesFuture: Future<Item = SystemResources, Error = Self::Error> + Send;
type RemoveAllFuture: Future<Item = (), Error = Self::Error> + Send;
type StopAllFuture: Future<Item = (), Error = Self::Error> + Send;

fn create(&self, module: ModuleSpec<Self::Config>) -> Self::CreateFuture;
fn get(&self, id: &str) -> Self::GetFuture;
Expand All @@ -499,6 +500,7 @@ pub trait ModuleRuntime: Sized {
fn logs(&self, id: &str, options: &LogOptions) -> Self::LogsFuture;
fn registry(&self) -> &Self::ModuleRegistry;
fn remove_all(&self) -> Self::RemoveAllFuture;
fn stop_all(&self, wait_before_kill: Option<Duration>) -> Self::StopAllFuture;
}

#[derive(Clone, Copy, Debug)]
Expand Down
26 changes: 26 additions & 0 deletions edgelet/edgelet-docker/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ impl ModuleRuntime for DockerModuleRuntime {
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type StopAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;

fn create(&self, module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
info!("Creating module {}...", module.name());
Expand Down Expand Up @@ -844,6 +845,26 @@ impl ModuleRuntime for DockerModuleRuntime {
future::join_all(n).map(|_| ())
}))
}

fn stop_all(&self, wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
let self_for_stop = self.clone();
Box::new(self.list().and_then(move |list| {
let n = list.into_iter().map(move |c| {
<DockerModuleRuntime as ModuleRuntime>::stop(
&self_for_stop,
c.name(),
wait_before_kill,
)
.or_else(|err| {
match Fail::find_root_cause(&err).downcast_ref::<ErrorKind>() {
Some(ErrorKind::NotFound(_)) | Some(ErrorKind::NotModified) => Ok(()),
_ => Err(err),
}
})
});
future::join_all(n).map(|_| ())
}))
}
}

impl Authenticator for DockerModuleRuntime {
Expand Down Expand Up @@ -1500,6 +1521,7 @@ mod tests {
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = FutureResult<(), Self::Error>;
type StopAllFuture = FutureResult<(), Self::Error>;

fn create(&self, _module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
unimplemented!()
Expand Down Expand Up @@ -1552,6 +1574,10 @@ mod tests {
fn remove_all(&self) -> Self::RemoveAllFuture {
unimplemented!()
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
unimplemented!()
}
}

impl Authenticator for TestModuleList {
Expand Down
11 changes: 11 additions & 0 deletions edgelet/edgelet-http-mgmt/src/client/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ impl ModuleRuntime for ModuleClient {
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type StopAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;

fn create(&self, _module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
unimplemented!()
Expand Down Expand Up @@ -346,6 +347,16 @@ impl ModuleRuntime for ModuleClient {
future::join_all(n).map(|_| ())
}))
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
let self_for_stop = self.clone();
Box::new(self.list().and_then(move |list| {
let n = list
.into_iter()
.map(move |c| <Self as ModuleRuntime>::stop(&self_for_stop, c.name(), None));
future::join_all(n).map(|_| ())
}))
}
}

pub struct Logs(String, Body);
Expand Down
5 changes: 5 additions & 0 deletions edgelet/edgelet-kube/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ where
type SystemResourcesFuture =
Box<dyn Future<Item = SystemResources, Error = Self::Error> + Send>;
type RemoveAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;
type StopAllFuture = Box<dyn Future<Item = (), Error = Self::Error> + Send>;

fn create(&self, module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
Box::new(create_module(self, module))
Expand Down Expand Up @@ -315,6 +316,10 @@ where
fn remove_all(&self) -> Self::RemoveAllFuture {
Box::new(future::ok(()))
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
Box::new(future::ok(()))
}
}

impl<T, S> Authenticator for KubeModuleRuntime<T, S>
Expand Down
5 changes: 5 additions & 0 deletions edgelet/edgelet-test-utils/src/module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ where
type SystemInfoFuture = FutureResult<SystemInfo, Self::Error>;
type SystemResourcesFuture = FutureResult<SystemResources, Self::Error>;
type RemoveAllFuture = FutureResult<(), Self::Error>;
type StopAllFuture = FutureResult<(), Self::Error>;

fn create(&self, _module: ModuleSpec<Self::Config>) -> Self::CreateFuture {
match self.module.as_ref().unwrap() {
Expand Down Expand Up @@ -452,4 +453,8 @@ where
fn remove_all(&self) -> Self::RemoveAllFuture {
future::ok(())
}

fn stop_all(&self, _wait_before_kill: Option<Duration>) -> Self::StopAllFuture {
future::ok(())
}
}
5 changes: 5 additions & 0 deletions edgelet/iotedged/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ pub enum InitializeErrorReason {
#[cfg(windows)]
RegisterWindowsService,
RemoveExistingModules,
StopExistingModules,
SaveSettings,
#[cfg(windows)]
StartWindowsService,
Expand Down Expand Up @@ -341,6 +342,10 @@ impl fmt::Display for InitializeErrorReason {
write!(f, "Could not remove existing modules")
}

InitializeErrorReason::StopExistingModules => {
write!(f, "Could not stop existing modules")
}

InitializeErrorReason::SaveSettings => write!(f, "Could not save settings file"),

#[cfg(windows)]
Expand Down
17 changes: 17 additions & 0 deletions edgelet/iotedged/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use std::fs::{DirBuilder, File, OpenOptions};
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::Duration;

use failure::{Context, Fail, ResultExt};
use futures::future::{Either, IntoFuture};
Expand Down Expand Up @@ -328,6 +329,22 @@ where
crypto.clone(),
)?;

// Normally iotedged will stop all modules when it shuts down. But if it crashed,
// modules will continue to run. On Linux systems where iotedged is responsible for
// creating/binding the socket (e.g., CentOS 7.5, which uses systemd but does not
// support systemd socket activation), modules will be left holding stale file
// descriptors for the workload and management APIs and calls on these APIs will
// begin to fail. Resilient modules should be able to deal with this, but we'll
// restart all modules to ensure a clean start.
const STOP_TIME: Duration = Duration::from_secs(30);
info!("Stopping all modules...");
tokio_runtime
.block_on(runtime.stop_all(Some(STOP_TIME)))
.context(ErrorKind::Initialize(
InitializeErrorReason::StopExistingModules
))?;
info!("Finished stopping modules.");

if $force_reprovision ||
($provisioning_result.reconfigure() != ReprovisioningStatus::DeviceDataNotUpdated) {
// If this device was re-provisioned and the device key was updated it causes
Expand Down

0 comments on commit 7066164

Please sign in to comment.