From 30e6e95c252a2017cc0e5eac0f759df8916cda01 Mon Sep 17 00:00:00 2001 From: J M <2364004+Blu-J@users.noreply.github.com> Date: Fri, 19 Aug 2022 11:05:23 -0600 Subject: [PATCH] Feat/long running (#1676) * feat: Start the long running container * feat: Long running docker, running, stoping, and uninstalling * feat: Just make the folders that we would like to mount. * fix: Uninstall not working * chore: remove some logging * feat: Smarter cleanup * feat: Wait for start * wip: Need to kill * chore: Remove the bad tracing * feat: Stopping the long running processes without killing the long running * Mino Feat: Change the Manifest To have a new type (#1736) * Add build-essential to README.md (#1716) Update README.md * write image to sparse-aware archive format (#1709) * fix: Add modification to the max_user_watches (#1695) * fix: Add modification to the max_user_watches * chore: Move to initialization * [Feat] follow logs (#1714) * tail logs * add cli * add FE * abstract http to shared * batch new logs * file download for logs * fix modal error when no config Co-authored-by: Chris Guida Co-authored-by: Aiden McClelland Co-authored-by: Matt Hill Co-authored-by: BluJ * Update README.md (#1728) * fix build for patch-db client for consistency (#1722) * fix cli install (#1720) * highlight instructions if not viewed (#1731) * wip: * [ ] Fix the build (dependencies:634 map for option) * fix: Cargo build * fix: Long running wasn't starting * fix: uninstall works Co-authored-by: Chris Guida Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Co-authored-by: Aiden McClelland Co-authored-by: Matt Hill Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com> Co-authored-by: Matt Hill * chore: Fix a dbg! * chore: Make the commands of the docker-inject do inject instead of exec * chore: Fix compile mistake * chore: Change to use simpler Co-authored-by: Chris Guida Co-authored-by: Aiden McClelland <3732071+dr-bonez@users.noreply.github.com> Co-authored-by: Aiden McClelland Co-authored-by: Matt Hill Co-authored-by: Lucy C <12953208+elvece@users.noreply.github.com> Co-authored-by: Matt Hill --- backend/src/action.rs | 25 +- backend/src/backup/mod.rs | 24 +- backend/src/config/action.rs | 13 +- backend/src/config/mod.rs | 41 +- backend/src/config/spec.rs | 14 +- backend/src/db/model.rs | 9 + backend/src/dependencies.rs | 34 +- backend/src/install/cleanup.rs | 63 ++- backend/src/install/mod.rs | 16 +- backend/src/manager/health.rs | 9 +- backend/src/manager/mod.rs | 742 ++++++++++++++++++++++------- backend/src/migration.rs | 13 +- backend/src/net/tor.rs | 2 +- backend/src/procedure/docker.rs | 327 ++++++++++--- backend/src/procedure/mod.rs | 81 +++- backend/src/properties.rs | 2 +- backend/src/s9pk/manifest.rs | 4 +- backend/src/s9pk/mod.rs | 3 +- backend/src/s9pk/reader.rs | 57 ++- backend/src/status/health_check.rs | 11 +- backend/src/volume.rs | 23 + libs/js_engine/src/lib.rs | 12 +- libs/models/src/procedure_name.rs | 27 +- 23 files changed, 1233 insertions(+), 319 deletions(-) diff --git a/backend/src/action.rs b/backend/src/action.rs index 1734c3ce8..c372f9427 100644 --- a/backend/src/action.rs +++ b/backend/src/action.rs @@ -8,7 +8,6 @@ use rpc_toolkit::command; use serde::{Deserialize, Serialize}; use tracing::instrument; -use crate::config::{Config, ConfigSpec}; use crate::context::RpcContext; use crate::id::ImageId; use crate::procedure::{PackageProcedure, ProcedureName}; @@ -16,6 +15,10 @@ use crate::s9pk::manifest::PackageId; use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFormat}; use crate::util::Version; use crate::volume::Volumes; +use crate::{ + config::{Config, ConfigSpec}, + procedure::docker::DockerContainer, +}; use crate::{Error, ResultExt}; #[derive(Clone, Debug, Default, Deserialize, Serialize)] pub struct Actions(pub BTreeMap); @@ -58,12 +61,13 @@ impl Action { #[instrument] pub fn validate( &self, + container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, ) -> Result<(), Error> { self.implementation - .validate(eos_version, volumes, image_ids, true) + .validate(container, eos_version, volumes, image_ids, true) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, @@ -76,6 +80,7 @@ impl Action { pub async fn execute( &self, ctx: &RpcContext, + container: &Option, pkg_id: &PackageId, pkg_version: &Version, action_id: &ActionId, @@ -90,12 +95,12 @@ impl Action { self.implementation .execute( ctx, + container, pkg_id, pkg_version, ProcedureName::Action(action_id.clone()), volumes, input, - true, None, ) .await? @@ -141,10 +146,24 @@ pub async fn action( .get(&mut db, true) .await? .to_owned(); + + let container = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&pkg_id) + .and_then(|p| p.installed()) + .expect(&mut db) + .await + .with_kind(crate::ErrorKind::NotFound)? + .manifest() + .container() + .get(&mut db, false) + .await? + .to_owned(); if let Some(action) = manifest.actions.0.get(&action_id) { action .execute( &ctx, + &container, &manifest.id, &manifest.version, &action_id, diff --git a/backend/src/backup/mod.rs b/backend/src/backup/mod.rs index 7e7a62b36..129f7b35d 100644 --- a/backend/src/backup/mod.rs +++ b/backend/src/backup/mod.rs @@ -14,7 +14,6 @@ use tokio::io::AsyncWriteExt; use tracing::instrument; use self::target::PackageBackupInfo; -use crate::context::RpcContext; use crate::dependencies::reconfigure_dependents_with_live_pointers; use crate::id::ImageId; use crate::install::PKG_ARCHIVE_DIR; @@ -25,6 +24,7 @@ use crate::util::serde::IoFormat; use crate::util::Version; use crate::version::{Current, VersionT}; use crate::volume::{backup_dir, Volume, VolumeId, Volumes, BACKUP_DIR}; +use crate::{context::RpcContext, procedure::docker::DockerContainer}; use crate::{Error, ErrorKind, ResultExt}; pub mod backup_bulk; @@ -73,15 +73,16 @@ pub struct BackupActions { impl BackupActions { pub fn validate( &self, + container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, ) -> Result<(), Error> { self.create - .validate(eos_version, volumes, image_ids, false) + .validate(container, eos_version, volumes, image_ids, false) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Backup Create"))?; self.restore - .validate(eos_version, volumes, image_ids, false) + .validate(container, eos_version, volumes, image_ids, false) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Backup Restore"))?; Ok(()) } @@ -100,18 +101,30 @@ impl BackupActions { let mut volumes = volumes.to_readonly(); volumes.insert(VolumeId::Backup, Volume::Backup { readonly: false }); let backup_dir = backup_dir(pkg_id); + let container = crate::db::DatabaseModel::new() + .package_data() + .idx_model(&pkg_id) + .and_then(|p| p.installed()) + .expect(db) + .await + .with_kind(crate::ErrorKind::NotFound)? + .manifest() + .container() + .get(db, false) + .await? + .to_owned(); if tokio::fs::metadata(&backup_dir).await.is_err() { tokio::fs::create_dir_all(&backup_dir).await? } self.create .execute::<(), NoOutput>( ctx, + &container, pkg_id, pkg_version, ProcedureName::CreateBackup, &volumes, None, - false, None, ) .await? @@ -186,6 +199,7 @@ impl BackupActions { #[instrument(skip(ctx, db, secrets))] pub async fn restore( &self, + container: &Option, ctx: &RpcContext, db: &mut Db, secrets: &mut Ex, @@ -202,12 +216,12 @@ impl BackupActions { self.restore .execute::<(), NoOutput>( ctx, + container, pkg_id, pkg_version, ProcedureName::RestoreBackup, &volumes, None, - false, None, ) .await? diff --git a/backend/src/config/action.rs b/backend/src/config/action.rs index c7db786e0..5d1cce45b 100644 --- a/backend/src/config/action.rs +++ b/backend/src/config/action.rs @@ -7,7 +7,6 @@ use serde::{Deserialize, Serialize}; use tracing::instrument; use super::{Config, ConfigSpec}; -use crate::context::RpcContext; use crate::dependencies::Dependencies; use crate::id::ImageId; use crate::procedure::{PackageProcedure, ProcedureName}; @@ -15,6 +14,7 @@ use crate::s9pk::manifest::PackageId; use crate::status::health_check::HealthCheckId; use crate::util::Version; use crate::volume::Volumes; +use crate::{context::RpcContext, procedure::docker::DockerContainer}; use crate::{Error, ResultExt}; #[derive(Debug, Deserialize, Serialize, HasModel)] @@ -33,15 +33,16 @@ impl ConfigActions { #[instrument] pub fn validate( &self, + container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, ) -> Result<(), Error> { self.get - .validate(eos_version, volumes, image_ids, true) + .validate(container, eos_version, volumes, image_ids, true) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Config Get"))?; self.set - .validate(eos_version, volumes, image_ids, true) + .validate(container, eos_version, volumes, image_ids, true) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Config Set"))?; Ok(()) } @@ -49,6 +50,7 @@ impl ConfigActions { pub async fn get( &self, ctx: &RpcContext, + container: &Option, pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, @@ -56,12 +58,12 @@ impl ConfigActions { self.get .execute( ctx, + container, pkg_id, pkg_version, ProcedureName::GetConfig, volumes, None::<()>, - false, None, ) .await @@ -74,6 +76,7 @@ impl ConfigActions { pub async fn set( &self, ctx: &RpcContext, + container: &Option, pkg_id: &PackageId, pkg_version: &Version, dependencies: &Dependencies, @@ -84,12 +87,12 @@ impl ConfigActions { .set .execute( ctx, + container, pkg_id, pkg_version, ProcedureName::SetConfig, volumes, Some(input), - false, None, ) .await diff --git a/backend/src/config/mod.rs b/backend/src/config/mod.rs index 519d70b5e..dd4bc45c1 100644 --- a/backend/src/config/mod.rs +++ b/backend/src/config/mod.rs @@ -13,7 +13,6 @@ use rpc_toolkit::command; use serde_json::Value; use tracing::instrument; -use crate::context::RpcContext; use crate::db::model::{CurrentDependencies, CurrentDependencyInfo, CurrentDependents}; use crate::dependencies::{ add_dependent_to_current_dependents_lists, break_transitive, heal_all_dependents_transitive, @@ -25,6 +24,7 @@ use crate::s9pk::manifest::{Manifest, PackageId}; use crate::util::display_none; use crate::util::serde::{display_serializable, parse_stdin_deserializable, IoFormat}; use crate::Error; +use crate::{context::RpcContext, procedure::docker::DockerContainer}; pub mod action; pub mod spec; @@ -167,6 +167,7 @@ pub struct ConfigGetReceipts { manifest_volumes: LockReceipt, manifest_version: LockReceipt, manifest_config: LockReceipt, ()>, + docker_container: LockReceipt, } impl ConfigGetReceipts { @@ -202,11 +203,19 @@ impl ConfigGetReceipts { .map(|x| x.manifest().config()) .make_locker(LockType::Write) .add_to_keys(locks); + let docker_container = crate::db::DatabaseModel::new() + .package_data() + .star() + .installed() + .and_then(|x| x.manifest().container()) + .make_locker(LockType::Write) + .add_to_keys(locks); move |skeleton_key| { Ok(Self { manifest_volumes: manifest_volumes.verify(skeleton_key)?, manifest_version: manifest_version.verify(skeleton_key)?, manifest_config: manifest_config.verify(skeleton_key)?, + docker_container: docker_container.verify(skeleton_key)?, }) } } @@ -229,9 +238,11 @@ pub async fn get( .await? .ok_or_else(|| Error::new(eyre!("{} has no config", id), crate::ErrorKind::NotFound))?; + let container = receipts.docker_container.get(&mut db, &id).await?; + let volumes = receipts.manifest_volumes.get(&mut db).await?; let version = receipts.manifest_version.get(&mut db).await?; - action.get(&ctx, &id, &version, &volumes).await + action.get(&ctx, &container, &id, &version, &volumes).await } #[command( @@ -274,6 +285,7 @@ pub struct ConfigReceipts { pub current_dependencies: LockReceipt, dependency_errors: LockReceipt, manifest_dependencies_config: LockReceipt, + docker_container: LockReceipt, } impl ConfigReceipts { @@ -378,6 +390,13 @@ impl ConfigReceipts { .and_then(|x| x.manifest().dependencies().star().config()) .make_locker(LockType::Write) .add_to_keys(locks); + let docker_container = crate::db::DatabaseModel::new() + .package_data() + .star() + .installed() + .and_then(|x| x.manifest().container()) + .make_locker(LockType::Write) + .add_to_keys(locks); move |skeleton_key| { Ok(Self { @@ -397,6 +416,7 @@ impl ConfigReceipts { current_dependencies: current_dependencies.verify(skeleton_key)?, dependency_errors: dependency_errors.verify(skeleton_key)?, manifest_dependencies_config: manifest_dependencies_config.verify(skeleton_key)?, + docker_container: docker_container.verify(skeleton_key)?, }) } } @@ -488,6 +508,8 @@ pub fn configure_rec<'a, Db: DbHandle>( receipts: &'a ConfigReceipts, ) -> BoxFuture<'a, Result<(), Error>> { async move { + let container = receipts.docker_container.get(db, &id).await?; + let container = &container; // fetch data from db let action = receipts .config_actions @@ -511,7 +533,7 @@ pub fn configure_rec<'a, Db: DbHandle>( let ConfigRes { config: old_config, spec, - } = action.get(ctx, id, &version, &volumes).await?; + } = action.get(ctx, container, id, &version, &volumes).await?; // determine new config to use let mut config = if let Some(config) = config.or_else(|| old_config.clone()) { @@ -579,7 +601,15 @@ pub fn configure_rec<'a, Db: DbHandle>( let signal = if !dry_run { // run config action let res = action - .set(ctx, id, &version, &dependencies, &volumes, &config) + .set( + ctx, + container, + id, + &version, + &dependencies, + &volumes, + &config, + ) .await?; // track dependencies with no pointers @@ -671,6 +701,8 @@ pub fn configure_rec<'a, Db: DbHandle>( .unwrap_or_default(); let next = Value::Object(config.clone()); for (dependent, dep_info) in dependents.0.iter().filter(|(dep_id, _)| dep_id != &id) { + let dependent_container = receipts.docker_container.get(db, &dependent).await?; + let dependent_container = &dependent_container; // check if config passes dependent check if let Some(cfg) = receipts .manifest_dependencies_config @@ -685,6 +717,7 @@ pub fn configure_rec<'a, Db: DbHandle>( if let Err(error) = cfg .check( ctx, + dependent_container, dependent, &manifest.version, &manifest.volumes, diff --git a/backend/src/config/spec.rs b/backend/src/config/spec.rs index f8b2d37e3..90a34e8a9 100644 --- a/backend/src/config/spec.rs +++ b/backend/src/config/spec.rs @@ -22,11 +22,11 @@ use sqlx::PgPool; use super::util::{self, CharSet, NumRange, UniqueBy, STATIC_NULL}; use super::{Config, MatchError, NoMatchWithPath, TimeoutError, TypeOf}; -use crate::config::ConfigurationError; use crate::context::RpcContext; use crate::net::interface::InterfaceId; use crate::s9pk::manifest::{Manifest, PackageId}; use crate::Error; +use crate::{config::ConfigurationError, procedure::docker::DockerContainer}; // Config Value Specifications #[async_trait] @@ -1882,6 +1882,7 @@ pub struct ConfigPointerReceipts { manifest_volumes: LockReceipt, manifest_version: LockReceipt, config_actions: LockReceipt, + docker_container: LockReceipt, } impl ConfigPointerReceipts { @@ -1918,12 +1919,20 @@ impl ConfigPointerReceipts { .and_then(|x| x.manifest().config()) .make_locker(LockType::Read) .add_to_keys(locks); + let docker_container = crate::db::DatabaseModel::new() + .package_data() + .star() + .installed() + .and_then(|x| x.manifest().container()) + .make_locker(LockType::Write) + .add_to_keys(locks); move |skeleton_key| { Ok(Self { interface_addresses_receipt: interface_addresses_receipt(skeleton_key)?, manifest_volumes: manifest_volumes.verify(skeleton_key)?, config_actions: config_actions.verify(skeleton_key)?, manifest_version: manifest_version.verify(skeleton_key)?, + docker_container: docker_container.verify(skeleton_key)?, }) } } @@ -1953,11 +1962,12 @@ impl ConfigPointer { let version = receipts.manifest_version.get(db, id).await.ok().flatten(); let cfg_actions = receipts.config_actions.get(db, id).await.ok().flatten(); let volumes = receipts.manifest_volumes.get(db, id).await.ok().flatten(); + let container = receipts.docker_container.get(db, id).await.ok().flatten(); if let (Some(version), Some(cfg_actions), Some(volumes)) = (&version, &cfg_actions, &volumes) { let cfg_res = cfg_actions - .get(ctx, &self.package_id, version, volumes) + .get(ctx, &container, &self.package_id, version, volumes) .await .map_err(|e| ConfigurationError::SystemError(e))?; if let Some(cfg) = cfg_res.config { diff --git a/backend/src/db/model.rs b/backend/src/db/model.rs index 82c9a6424..6991184a0 100644 --- a/backend/src/db/model.rs +++ b/backend/src/db/model.rs @@ -239,6 +239,15 @@ impl PackageDataEntry { PackageDataEntry::Installed { manifest, .. } => manifest, } } + pub fn manifest_borrow(&self) -> &Manifest { + match self { + PackageDataEntry::Installing { manifest, .. } => manifest, + PackageDataEntry::Updating { manifest, .. } => manifest, + PackageDataEntry::Restoring { manifest, .. } => manifest, + PackageDataEntry::Removing { manifest, .. } => manifest, + PackageDataEntry::Installed { manifest, .. } => manifest, + } + } } impl PackageDataEntryModel { pub fn installed(self) -> OptionModel { diff --git a/backend/src/dependencies.rs b/backend/src/dependencies.rs index 90248f290..2b6c76197 100644 --- a/backend/src/dependencies.rs +++ b/backend/src/dependencies.rs @@ -14,7 +14,6 @@ use rpc_toolkit::command; use serde::{Deserialize, Serialize}; use tracing::instrument; -use crate::config::action::{ConfigActions, ConfigRes}; use crate::config::spec::PackagePointerSpec; use crate::config::{not_found, Config, ConfigReceipts, ConfigSpec}; use crate::context::RpcContext; @@ -27,6 +26,10 @@ use crate::util::serde::display_serializable; use crate::util::{display_none, Version}; use crate::volume::Volumes; use crate::Error; +use crate::{ + config::action::{ConfigActions, ConfigRes}, + procedure::docker::DockerContainer, +}; #[command(subcommands(configure))] pub fn dependency() -> Result<(), Error> { @@ -63,6 +66,7 @@ pub struct TryHealReceipts { manifest_version: LockReceipt, current_dependencies: LockReceipt, dependency_errors: LockReceipt, + docker_container: LockReceipt, } impl TryHealReceipts { @@ -110,6 +114,13 @@ impl TryHealReceipts { .map(|x| x.status().dependency_errors()) .make_locker(LockType::Write) .add_to_keys(locks); + let docker_container = crate::db::DatabaseModel::new() + .package_data() + .star() + .installed() + .and_then(|x| x.manifest().container()) + .make_locker(LockType::Write) + .add_to_keys(locks); move |skeleton_key| { Ok(Self { status: status.verify(skeleton_key)?, @@ -117,6 +128,7 @@ impl TryHealReceipts { current_dependencies: current_dependencies.verify(skeleton_key)?, manifest: manifest.verify(skeleton_key)?, dependency_errors: dependency_errors.verify(skeleton_key)?, + docker_container: docker_container.verify(skeleton_key)?, }) } } @@ -193,6 +205,7 @@ impl DependencyError { receipts: &'a TryHealReceipts, ) -> BoxFuture<'a, Result, Error>> { async move { + let container = receipts.docker_container.get(db, id).await?; Ok(match self { DependencyError::NotInstalled => { if receipts.status.get(db, dependency).await?.is_some() { @@ -240,6 +253,7 @@ impl DependencyError { cfg_info .get( ctx, + &container, dependency, &dependency_manifest.version, &dependency_manifest.volumes, @@ -254,6 +268,7 @@ impl DependencyError { if let Err(error) = cfg_req .check( ctx, + &container, id, &dependent_manifest.version, &dependent_manifest.volumes, @@ -494,6 +509,7 @@ impl DependencyConfig { pub async fn check( &self, ctx: &RpcContext, + container: &Option, dependent_id: &PackageId, dependent_version: &Version, dependent_volumes: &Volumes, @@ -503,6 +519,7 @@ impl DependencyConfig { Ok(self .check .sandboxed( + container, ctx, dependent_id, dependent_version, @@ -517,6 +534,7 @@ impl DependencyConfig { pub async fn auto_configure( &self, ctx: &RpcContext, + container: &Option, dependent_id: &PackageId, dependent_version: &Version, dependent_volumes: &Volumes, @@ -524,6 +542,7 @@ impl DependencyConfig { ) -> Result { self.auto_configure .sandboxed( + container, ctx, dependent_id, dependent_version, @@ -545,6 +564,7 @@ pub struct DependencyConfigReceipts { dependency_config_action: LockReceipt, package_volumes: LockReceipt, package_version: LockReceipt, + docker_container: LockReceipt, } impl DependencyConfigReceipts { @@ -607,6 +627,13 @@ impl DependencyConfigReceipts { .map(|x| x.manifest().version()) .make_locker(LockType::Write) .add_to_keys(locks); + let docker_container = crate::db::DatabaseModel::new() + .package_data() + .star() + .installed() + .and_then(|x| x.manifest().container()) + .make_locker(LockType::Write) + .add_to_keys(locks); move |skeleton_key| { Ok(Self { config: config(skeleton_key)?, @@ -616,6 +643,7 @@ impl DependencyConfigReceipts { dependency_config_action: dependency_config_action.verify(&skeleton_key)?, package_volumes: package_volumes.verify(&skeleton_key)?, package_version: package_version.verify(&skeleton_key)?, + docker_container: docker_container.verify(&skeleton_key)?, }) } } @@ -690,6 +718,8 @@ pub async fn configure_logic( let dependency_version = receipts.dependency_version.get(db).await?; let dependency_volumes = receipts.dependency_volumes.get(db).await?; let dependencies = receipts.dependencies.get(db).await?; + let dependency_docker_container = receipts.docker_container.get(db, &*dependency_id).await?; + let pkg_docker_container = receipts.docker_container.get(db, &*pkg_id).await?; let dependency = dependencies .0 @@ -722,6 +752,7 @@ pub async fn configure_logic( } = dependency_config_action .get( &ctx, + &dependency_docker_container, &dependency_id, &dependency_version, &dependency_volumes, @@ -740,6 +771,7 @@ pub async fn configure_logic( let new_config = dependency .auto_configure .sandboxed( + &pkg_docker_container, &ctx, &pkg_id, &pkg_version, diff --git a/backend/src/install/cleanup.rs b/backend/src/install/cleanup.rs index 8d4301fce..27f4bb2be 100644 --- a/backend/src/install/cleanup.rs +++ b/backend/src/install/cleanup.rs @@ -1,6 +1,12 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + path::{Path, PathBuf}, + sync::Arc, +}; use bollard::image::ListImagesOptions; +use color_eyre::Report; +use futures::FutureExt; use patch_db::{DbHandle, LockReceipt, LockTargetId, LockType, PatchDbHandle, Verifier}; use sqlx::{Executor, Postgres}; use tracing::instrument; @@ -349,6 +355,14 @@ where packages.0.remove(id); packages }; + let dependents_paths: Vec = entry + .current_dependents + .0 + .keys() + .flat_map(|x| packages.0.get(x)) + .flat_map(|x| x.manifest_borrow().volumes.values()) + .flat_map(|x| x.pointer_path(&ctx.datadir)) + .collect(); receipts.packages.set(&mut tx, packages).await?; // once we have removed the package entry, we can change all the dependent pointers to null reconfigure_dependents_with_live_pointers(ctx, &mut tx, &receipts.config, &entry).await?; @@ -372,11 +386,11 @@ where .datadir .join(crate::volume::PKG_VOLUME_DIR) .join(&entry.manifest.id); - if tokio::fs::metadata(&volumes).await.is_ok() { - tokio::fs::remove_dir_all(&volumes).await?; - } - tx.commit().await?; + + tracing::debug!("Cleaning up {:?} at {:?}", volumes, dependents_paths); + cleanup_folder(volumes, Arc::new(dependents_paths)).await; remove_tor_keys(secrets, &entry.manifest.id).await?; + tx.commit().await?; Ok(()) } @@ -391,3 +405,42 @@ where .await?; Ok(()) } + +/// Needed to remove, without removing the folders that are mounted in the other docker containers +pub fn cleanup_folder( + path: PathBuf, + dependents_volumes: Arc>, +) -> futures::future::BoxFuture<'static, ()> { + Box::pin(async move { + let meta_data = match tokio::fs::metadata(&path).await { + Ok(a) => a, + Err(e) => { + return; + } + }; + if !meta_data.is_dir() { + tracing::error!("is_not dir, remove {:?}", path); + let _ = tokio::fs::remove_file(&path).await; + return; + } + if !dependents_volumes + .iter() + .any(|v| v.starts_with(&path) || v == &path) + { + tracing::error!("No parents, remove {:?}", path); + let _ = tokio::fs::remove_dir_all(&path).await; + return; + } + let mut read_dir = match tokio::fs::read_dir(&path).await { + Ok(a) => a, + Err(e) => { + return; + } + }; + tracing::error!("Parents, recurse {:?}", path); + while let Some(entry) = read_dir.next_entry().await.ok().flatten() { + let entry_path = entry.path(); + cleanup_folder(entry_path, dependents_volumes.clone()).await; + } + }) +} diff --git a/backend/src/install/mod.rs b/backend/src/install/mod.rs index 089527d63..d2cd0311a 100644 --- a/backend/src/install/mod.rs +++ b/backend/src/install/mod.rs @@ -1318,6 +1318,7 @@ pub async fn install_s9pk( .manifest .migrations .to( + &prev.manifest.container, ctx, version, pkg_id, @@ -1328,6 +1329,7 @@ pub async fn install_s9pk( let migration = manifest .migrations .from( + &manifest.container, ctx, &prev.manifest.version, pkg_id, @@ -1411,6 +1413,7 @@ pub async fn install_s9pk( manifest .backup .restore( + &manifest.container, ctx, &mut tx, &mut sql_tx, @@ -1518,11 +1521,14 @@ async fn handle_recovered_package( tx: &mut patch_db::Transaction<&mut patch_db::PatchDbHandle>, receipts: &ConfigReceipts, ) -> Result<(), Error> { - let configured = if let Some(migration) = - manifest - .migrations - .from(ctx, &recovered.version, pkg_id, version, &manifest.volumes) - { + let configured = if let Some(migration) = manifest.migrations.from( + &manifest.container, + ctx, + &recovered.version, + pkg_id, + version, + &manifest.volumes, + ) { migration.await?.configured } else { false diff --git a/backend/src/manager/health.rs b/backend/src/manager/health.rs index 6419e8a7a..e070ad5a9 100644 --- a/backend/src/manager/health.rs +++ b/backend/src/manager/health.rs @@ -113,7 +113,14 @@ pub async fn check( let health_results = if let Some(started) = started { manifest .health_checks - .check_all(ctx, started, id, &manifest.version, &manifest.volumes) + .check_all( + ctx, + &manifest.container, + started, + id, + &manifest.version, + &manifest.volumes, + ) .await? } else { return Ok(()); diff --git a/backend/src/manager/mod.rs b/backend/src/manager/mod.rs index 017fec6e3..9d1654815 100644 --- a/backend/src/manager/mod.rs +++ b/backend/src/manager/mod.rs @@ -1,6 +1,7 @@ use std::collections::BTreeMap; use std::convert::TryInto; use std::future::Future; +use std::net::Ipv4Addr; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use std::sync::Arc; use std::task::Poll; @@ -13,14 +14,14 @@ use nix::sys::signal::Signal; use num_enum::TryFromPrimitive; use patch_db::DbHandle; use sqlx::{Executor, Postgres}; +use tokio::io::BufReader; use tokio::sync::watch::error::RecvError; use tokio::sync::watch::{channel, Receiver, Sender}; -use tokio::sync::{Notify, RwLock}; +use tokio::sync::{Mutex, Notify, RwLock}; +use tokio::task::JoinHandle; use torut::onion::TorSecretKeyV3; use tracing::instrument; -use crate::context::RpcContext; -use crate::manager::sync::synchronizer; use crate::net::interface::InterfaceId; use crate::net::GeneratedCertificateMountPoint; use crate::notifications::NotificationLevel; @@ -30,6 +31,8 @@ use crate::s9pk::manifest::{Manifest, PackageId}; use crate::status::MainStatus; use crate::util::{Container, NonDetachingJoinHandle, Version}; use crate::Error; +use crate::{context::RpcContext, procedure::docker::DockerContainer}; +use crate::{manager::sync::synchronizer, procedure::docker::DockerInject}; pub mod health; mod sync; @@ -68,6 +71,7 @@ impl ManagerMap { } else { continue; }; + let tor_keys = man.interfaces.tor_keys(secrets, &package).await?; res.insert( (package, man.version.clone()), @@ -145,6 +149,7 @@ impl ManagerMap { pub struct Manager { shared: Arc, thread: Container>, + persistant_container: Arc, } #[derive(TryFromPrimitive)] @@ -176,130 +181,48 @@ pub enum OnStop { Exit, } -#[instrument(skip(state))] +#[instrument(skip(state, persistant))] async fn run_main( state: &Arc, + persistant: Arc, ) -> Result, Error> { let rt_state = state.clone(); - let interfaces = state - .manifest - .interfaces - .0 - .iter() - .map(|(id, info)| { - Ok(( - id.clone(), - info, - state - .tor_keys - .get(id) - .ok_or_else(|| { - Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor) - })? - .clone(), - )) - }) - .collect::, Error>>()?; - let generated_certificate = state - .ctx - .net_controller - .generate_certificate_mountpoint(&state.manifest.id, &interfaces) - .await?; - let mut runtime = - tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await }); - let ip; - loop { - match state - .ctx - .docker - .inspect_container(&state.container_name, None) - .await - { - Ok(res) => { - if let Some(ip_addr) = res - .network_settings - .and_then(|ns| ns.networks) - .and_then(|mut n| n.remove("start9")) - .and_then(|es| es.ip_address) - .filter(|ip| !ip.is_empty()) - .map(|ip| ip.parse()) - .transpose()? - { - ip = ip_addr; - break; - } - } - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) => (), - Err(e) => Err(e)?, - } - match futures::poll!(&mut runtime) { - Poll::Ready(res) => { - return res - .map_err(|_| { - Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker) - }) - .and_then(|a| a) - } - _ => (), - } - } - - state - .ctx - .net_controller - .add(&state.manifest.id, ip, interfaces, generated_certificate) - .await?; + let interfaces = states_main_interfaces(state)?; + let generated_certificate = generate_certificate(state, &interfaces).await?; - state - .commit_health_check_results - .store(true, Ordering::SeqCst); - let health = async { - tokio::time::sleep(Duration::from_secs(10)).await; // only sleep for 1 second before first health check - loop { - let mut db = state.ctx.db.handle(); - if let Err(e) = health::check( - &state.ctx, - &mut db, - &state.manifest.id, - &state.commit_health_check_results, + persistant.wait_for_persistant().await; + let is_injectable_main = check_is_injectable_main(&state); + let mut runtime = match is_injectable_main { + true => { + tokio::spawn( + async move { start_up_inject_image(rt_state, generated_certificate).await }, ) - .await - { - tracing::error!( - "Failed to run health check for {}: {}", - &state.manifest.id, - e - ); - tracing::debug!("{:?}", e); - } - tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await; } + false => tokio::spawn(async move { start_up_image(rt_state, generated_certificate).await }), }; - let _ = state - .status - .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { - if x == Status::Starting as usize { - Some(Status::Running as usize) - } else { - None - } - }); + let ip = match is_injectable_main { + false => Some(match get_running_ip(state, &mut runtime).await { + GetRunninIp::Ip(x) => x, + GetRunninIp::Error(e) => return Err(e), + GetRunninIp::EarlyExit(x) => return Ok(x), + }), + true => None, + }; + + if let Some(ip) = ip { + add_network_for_main(state, ip, interfaces, generated_certificate).await?; + } + + set_commit_health_true(state); + let health = main_health_check_daemon(state.clone()); + fetch_starting_to_running(state); let res = tokio::select! { a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).and_then(|a| a), _ = health => Err(Error::new(eyre!("Health check daemon exited!"), crate::ErrorKind::Unknown)), }; - state - .ctx - .net_controller - .remove( - &state.manifest.id, - ip, - state.manifest.interfaces.0.keys().cloned(), - ) - .await?; + if let Some(ip) = ip { + remove_network_for_main(state, ip).await?; + } res } @@ -314,12 +237,34 @@ async fn start_up_image( .main .execute::<(), NoOutput>( &rt_state.ctx, + &rt_state.manifest.container, + &rt_state.manifest.id, + &rt_state.manifest.version, + ProcedureName::Main, + &rt_state.manifest.volumes, + None, + None, + ) + .await +} + +/// We want to start up the manifest, but in this case we want to know that we have generated the certificates. +/// Note for _generated_certificate: Needed to know that before we start the state we have generated the certificate +async fn start_up_inject_image( + rt_state: Arc, + _generated_certificate: GeneratedCertificateMountPoint, +) -> Result, Error> { + rt_state + .manifest + .main + .inject::<(), NoOutput>( + &rt_state.ctx, + &rt_state.manifest.container, &rt_state.manifest.id, &rt_state.manifest.version, ProcedureName::Main, &rt_state.manifest.volumes, None, - false, None, ) .await @@ -346,14 +291,17 @@ impl Manager { }); shared.synchronize_now.notify_one(); let thread_shared = shared.clone(); + let persistant_container = PersistantContainer::new(&thread_shared); + let managers_persistant = persistant_container.clone(); let thread = tokio::spawn(async move { tokio::select! { - _ = manager_thread_loop(recv, &thread_shared) => (), + _ = manager_thread_loop(recv, &thread_shared, managers_persistant) => (), _ = synchronizer(&*thread_shared) => (), } }); Ok(Manager { shared, + persistant_container, thread: Container::new(Some(thread.into())), }) } @@ -400,41 +348,53 @@ impl Manager { .commit_health_check_results .store(false, Ordering::SeqCst); let _ = self.shared.on_stop.send(OnStop::Exit); - let action = match &self.shared.manifest.main { - PackageProcedure::Docker(a) => a, - #[cfg(feature = "js_engine")] - PackageProcedure::Script(_) => return Ok(()), - }; - match self - .shared - .ctx - .docker - .stop_container( - &self.shared.container_name, - Some(StopContainerOptions { - t: action - .sigterm_timeout - .map(|a| *a) - .unwrap_or(Duration::from_secs(30)) - .as_secs_f64() as i64, - }), - ) - .await + let sigterm_timeout: Option = match &self.shared.manifest.main { - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. + PackageProcedure::Docker(DockerProcedure { + sigterm_timeout, .. }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 409, // CONFLICT - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 304, // NOT MODIFIED - .. - }) => (), // Already stopped - a => a?, + | PackageProcedure::DockerInject(DockerInject { + sigterm_timeout, .. + }) => sigterm_timeout.clone(), + #[cfg(feature = "js_engine")] + PackageProcedure::Script(_) => return Ok(()), }; + self.persistant_container.stop().await; + + if !check_is_injectable_main(&self.shared) { + match self + .shared + .ctx + .docker + .stop_container( + &self.shared.container_name, + Some(StopContainerOptions { + t: sigterm_timeout + .map(|a| *a) + .unwrap_or(Duration::from_secs(30)) + .as_secs_f64() as i64, + }), + ) + .await + { + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 409, // CONFLICT + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 304, // NOT MODIFIED + .. + }) => (), // Already stopped + a => a?, + }; + } else { + stop_non_first(&*self.shared.container_name).await; + } + self.shared.status.store( Status::Shutdown as usize, std::sync::atomic::Ordering::SeqCst, @@ -456,7 +416,11 @@ impl Manager { } } -async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc) { +async fn manager_thread_loop( + mut recv: Receiver, + thread_shared: &Arc, + persistant_container: Arc, +) { loop { fn handle_stop_action<'a>( recv: &'a mut Receiver, @@ -496,7 +460,7 @@ async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc (), // restart Ok(Err(e)) => { let mut db = thread_shared.ctx.db.handle(); @@ -546,6 +510,372 @@ async fn manager_thread_loop(mut recv: Receiver, thread_shared: &Arc, Error>>>>>, + should_stop_running: Arc, + wait_for_start: (Sender, Receiver), +} + +impl PersistantContainer { + #[instrument(skip(thread_shared))] + fn new(thread_shared: &Arc) -> Arc { + let wait_for_start = channel(false); + let container = Arc::new(Self { + container_name: thread_shared.container_name.clone(), + running_docker: Arc::new(Mutex::new(None)), + should_stop_running: Arc::new(AtomicBool::new(false)), + wait_for_start: wait_for_start, + }); + tokio::spawn(persistant_container( + thread_shared.clone(), + container.clone(), + )); + container + } + #[instrument(skip(self))] + async fn stop(&self) { + let container_name = &self.container_name; + self.should_stop_running.store(true, Ordering::SeqCst); + let mut running_docker = self.running_docker.lock().await; + *running_docker = None; + use tokio::process::Command; + if let Err(_err) = Command::new("docker") + .args(["stop", "-t", "0", &*container_name]) + .output() + .await + {} + if let Err(_err) = Command::new("docker") + .args(["kill", &*container_name]) + .output() + .await + {} + } + + async fn wait_for_persistant(&self) { + let mut changed_rx = self.wait_for_start.1.clone(); + loop { + if !*changed_rx.borrow() { + return; + } + changed_rx.changed().await.unwrap(); + } + } + + async fn start_wait(&self) { + self.wait_for_start.0.send(true).unwrap(); + } + async fn done_waiting(&self) { + self.wait_for_start.0.send(false).unwrap(); + } +} +impl Drop for PersistantContainer { + fn drop(&mut self) { + self.should_stop_running.store(true, Ordering::SeqCst); + let container_name = self.container_name.clone(); + let running_docker = self.running_docker.clone(); + tokio::spawn(async move { + let mut running_docker = running_docker.lock().await; + *running_docker = None; + + use std::process::Command; + if let Err(_err) = Command::new("docker") + .args(["kill", &*container_name]) + .output() + {} + }); + } +} + +async fn persistant_container( + thread_shared: Arc, + container: Arc, +) { + let main_docker_procedure_for_long = injectable_main(&thread_shared); + match main_docker_procedure_for_long { + Some(main) => loop { + if container.should_stop_running.load(Ordering::SeqCst) { + return; + } + container.start_wait().await; + match run_persistant_container(&thread_shared, container.clone(), main.clone()).await { + Ok(_) => (), + Err(e) => { + tracing::error!("failed to start persistant container: {}", e); + tracing::debug!("{:?}", e); + } + } + }, + None => futures::future::pending().await, + } +} + +fn injectable_main(thread_shared: &Arc) -> Option> { + if let ( + PackageProcedure::DockerInject(DockerInject { + system, + entrypoint, + args, + io_format, + sigterm_timeout, + }), + Some(DockerContainer { + image, + mounts, + shm_size_mb, + }), + ) = ( + &thread_shared.manifest.main, + &thread_shared.manifest.container, + ) { + Some(Arc::new(DockerProcedure { + image: image.clone(), + mounts: mounts.clone(), + io_format: *io_format, + shm_size_mb: *shm_size_mb, + sigterm_timeout: *sigterm_timeout, + system: *system, + entrypoint: "sleep".to_string(), + args: vec!["infinity".to_string()], + })) + } else { + None + } +} +fn check_is_injectable_main(thread_shared: &ManagerSharedState) -> bool { + match &thread_shared.manifest.main { + PackageProcedure::Docker(_a) => false, + PackageProcedure::DockerInject(a) => true, + #[cfg(feature = "js_engine")] + PackageProcedure::Script(_) => false, + } +} +async fn run_persistant_container( + state: &Arc, + persistant: Arc, + docker_procedure: Arc, +) -> Result<(), Error> { + let interfaces = states_main_interfaces(state)?; + let generated_certificate = generate_certificate(state, &interfaces).await?; + let mut runtime = tokio::spawn(long_running_docker(state.clone(), docker_procedure)); + + let ip = match get_running_ip(state, &mut runtime).await { + GetRunninIp::Ip(x) => x, + GetRunninIp::Error(e) => return Err(e), + GetRunninIp::EarlyExit(e) => { + tracing::error!("Early Exit"); + tracing::debug!("{:?}", e); + return Ok(()); + } + }; + persistant.done_waiting().await; + add_network_for_main(state, ip, interfaces, generated_certificate).await?; + + fetch_starting_to_running(state); + let res = tokio::select! { + a = runtime => a.map_err(|_| Error::new(eyre!("Manager runtime panicked!"), crate::ErrorKind::Docker)).map(|_| ()), + }; + remove_network_for_main(state, ip).await?; + res +} + +async fn long_running_docker( + rt_state: Arc, + main_status: Arc, +) -> Result, Error> { + main_status + .execute::<(), NoOutput>( + &rt_state.ctx, + &rt_state.manifest.id, + &rt_state.manifest.version, + ProcedureName::LongRunning, + &rt_state.manifest.volumes, + None, + None, + ) + .await +} + +async fn remove_network_for_main( + state: &Arc, + ip: std::net::Ipv4Addr, +) -> Result<(), Error> { + state + .ctx + .net_controller + .remove( + &state.manifest.id, + ip, + state.manifest.interfaces.0.keys().cloned(), + ) + .await?; + Ok(()) +} + +fn fetch_starting_to_running(state: &Arc) { + let _ = state + .status + .fetch_update(Ordering::SeqCst, Ordering::SeqCst, |x| { + if x == Status::Starting as usize { + Some(Status::Running as usize) + } else { + None + } + }); +} + +async fn main_health_check_daemon(state: Arc) { + tokio::time::sleep(Duration::from_secs(10)).await; + loop { + let mut db = state.ctx.db.handle(); + if let Err(e) = health::check( + &state.ctx, + &mut db, + &state.manifest.id, + &state.commit_health_check_results, + ) + .await + { + tracing::error!( + "Failed to run health check for {}: {}", + &state.manifest.id, + e + ); + tracing::debug!("{:?}", e); + } + tokio::time::sleep(Duration::from_secs(HEALTH_CHECK_COOLDOWN_SECONDS)).await; + } +} + +fn set_commit_health_true(state: &Arc) { + state + .commit_health_check_results + .store(true, Ordering::SeqCst); +} + +async fn add_network_for_main( + state: &Arc, + ip: std::net::Ipv4Addr, + interfaces: Vec<( + InterfaceId, + &crate::net::interface::Interface, + TorSecretKeyV3, + )>, + generated_certificate: GeneratedCertificateMountPoint, +) -> Result<(), Error> { + state + .ctx + .net_controller + .add(&state.manifest.id, ip, interfaces, generated_certificate) + .await?; + Ok(()) +} + +enum GetRunninIp { + Ip(Ipv4Addr), + Error(Error), + EarlyExit(Result), +} + +async fn get_running_ip( + state: &Arc, + mut runtime: &mut tokio::task::JoinHandle, Error>>, +) -> GetRunninIp { + loop { + match container_inspect(state).await { + Ok(res) => { + match res + .network_settings + .and_then(|ns| ns.networks) + .and_then(|mut n| n.remove("start9")) + .and_then(|es| es.ip_address) + .filter(|ip| !ip.is_empty()) + .map(|ip| ip.parse()) + .transpose() + { + Ok(Some(ip_addr)) => return GetRunninIp::Ip(ip_addr), + Ok(None) => (), + Err(e) => return GetRunninIp::Error(e.into()), + } + } + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) => (), + Err(e) => return GetRunninIp::Error(e.into()), + } + match futures::poll!(&mut runtime) { + Poll::Ready(res) => match res { + Ok(Ok(response)) => return GetRunninIp::EarlyExit(response), + Err(_) | Ok(Err(_)) => { + return GetRunninIp::Error(Error::new( + eyre!("Manager runtime panicked!"), + crate::ErrorKind::Docker, + )) + } + }, + _ => (), + } + } +} + +async fn container_inspect( + state: &Arc, +) -> Result { + state + .ctx + .docker + .inspect_container(&state.container_name, None) + .await +} + +async fn generate_certificate( + state: &Arc, + interfaces: &Vec<( + InterfaceId, + &crate::net::interface::Interface, + TorSecretKeyV3, + )>, +) -> Result { + Ok(state + .ctx + .net_controller + .generate_certificate_mountpoint(&state.manifest.id, interfaces) + .await?) +} + +fn states_main_interfaces( + state: &Arc, +) -> Result< + Vec<( + InterfaceId, + &crate::net::interface::Interface, + TorSecretKeyV3, + )>, + Error, +> { + Ok(state + .manifest + .interfaces + .0 + .iter() + .map(|(id, info)| { + Ok(( + id.clone(), + info, + state + .tor_keys + .get(id) + .ok_or_else(|| { + Error::new(eyre!("interface {} missing key", id), crate::ErrorKind::Tor) + })? + .clone(), + )) + }) + .collect::, Error>>()?) +} + #[instrument(skip(shared))] async fn stop(shared: &ManagerSharedState) -> Result<(), Error> { shared @@ -563,40 +893,50 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> { ) { resume(shared).await?; } - let action = match &shared.manifest.main { - PackageProcedure::Docker(a) => a, + match &shared.manifest.main { + PackageProcedure::Docker(DockerProcedure { + sigterm_timeout, .. + }) + | PackageProcedure::DockerInject(DockerInject { + sigterm_timeout, .. + }) => { + if !check_is_injectable_main(shared) { + match shared + .ctx + .docker + .stop_container( + &shared.container_name, + Some(StopContainerOptions { + t: sigterm_timeout + .map(|a| *a) + .unwrap_or(Duration::from_secs(30)) + .as_secs_f64() as i64, + }), + ) + .await + { + Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 409, // CONFLICT + .. + }) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 304, // NOT MODIFIED + .. + }) => (), // Already stopped + a => a?, + }; + } else { + stop_non_first(&shared.container_name).await; + } + } #[cfg(feature = "js_engine")] PackageProcedure::Script(_) => return Ok(()), }; - match shared - .ctx - .docker - .stop_container( - &shared.container_name, - Some(StopContainerOptions { - t: action - .sigterm_timeout - .map(|a| *a) - .unwrap_or(Duration::from_secs(30)) - .as_secs_f64() as i64, - }), - ) - .await - { - Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 409, // CONFLICT - .. - }) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 304, // NOT MODIFIED - .. - }) => (), // Already stopped - a => a?, - }; + tracing::debug!("Stopping a docker"); shared.status.store( Status::Stopped as usize, std::sync::atomic::Ordering::SeqCst, @@ -604,6 +944,44 @@ async fn stop(shared: &ManagerSharedState) -> Result<(), Error> { Ok(()) } +/// So the sleep infinity, which is the long running, is pid 1. So we kill the others +async fn stop_non_first(container_name: &str) { + // tracing::error!("BLUJ TODO: sudo docker exec {} sh -c \"ps ax | awk '\\$1 ~ /^[:0-9:]/ && \\$1 > 1 {{print \\$1}}' | xargs kill\"", container_name); + + // (sleep infinity) & export RUNNING=$! && echo $! && (wait $RUNNING && echo "DONE FOR $RUNNING") & + // (RUNNING=$(sleep infinity & echo $!); echo "running $RUNNING"; wait $RUNNING; echo "DONE FOR ?") & + + let _ = tokio::process::Command::new("docker") + .args([ + "container", + "exec", + container_name, + "sh", + "-c", + "ps ax | awk '$1 ~ /^[:0-9:]/ && $1 > 1 {print $1}' | xargs kill", + ]) + .output() + .await; +} + +// #[test] +// fn test_stop_non_first() { +// assert_eq!( +// &format!( +// "{}", +// tokio::process::Command::new("docker").args([ +// "container", +// "exec", +// "container_name", +// "sh", +// "-c", +// "ps ax | awk \"\\$1 ~ /^[:0-9:]/ && \\$1 > 1 {print \\$1}\"| xargs kill", +// ]) +// ), +// "" +// ); +// } + #[instrument(skip(shared))] async fn start(shared: &ManagerSharedState) -> Result<(), Error> { shared.on_stop.send(OnStop::Restart).map_err(|_| { diff --git a/backend/src/migration.rs b/backend/src/migration.rs index f4744b033..2b65df438 100644 --- a/backend/src/migration.rs +++ b/backend/src/migration.rs @@ -8,12 +8,12 @@ use patch_db::HasModel; use serde::{Deserialize, Serialize}; use tracing::instrument; -use crate::context::RpcContext; use crate::id::ImageId; use crate::procedure::{PackageProcedure, ProcedureName}; use crate::s9pk::manifest::PackageId; use crate::util::Version; use crate::volume::Volumes; +use crate::{context::RpcContext, procedure::docker::DockerContainer}; use crate::{Error, ResultExt}; #[derive(Clone, Debug, Default, Deserialize, Serialize, HasModel)] @@ -26,13 +26,14 @@ impl Migrations { #[instrument] pub fn validate( &self, + container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, ) -> Result<(), Error> { for (version, migration) in &self.from { migration - .validate(eos_version, volumes, image_ids, true) + .validate(container, eos_version, volumes, image_ids, true) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, @@ -42,7 +43,7 @@ impl Migrations { } for (version, migration) in &self.to { migration - .validate(eos_version, volumes, image_ids, true) + .validate(container, eos_version, volumes, image_ids, true) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, @@ -56,6 +57,7 @@ impl Migrations { #[instrument(skip(ctx))] pub fn from<'a>( &'a self, + container: &'a Option, ctx: &'a RpcContext, version: &'a Version, pkg_id: &'a PackageId, @@ -71,12 +73,12 @@ impl Migrations { migration .execute( ctx, + container, pkg_id, pkg_version, ProcedureName::Migration, // Migrations cannot be executed concurrently volumes, Some(version), - false, None, ) .map(|r| { @@ -95,6 +97,7 @@ impl Migrations { #[instrument(skip(ctx))] pub fn to<'a>( &'a self, + container: &'a Option, ctx: &'a RpcContext, version: &'a Version, pkg_id: &'a PackageId, @@ -106,12 +109,12 @@ impl Migrations { migration .execute( ctx, + container, pkg_id, pkg_version, ProcedureName::Migration, volumes, Some(version), - false, None, ) .map(|r| { diff --git a/backend/src/net/tor.rs b/backend/src/net/tor.rs index 590bba2a4..7fa105337 100644 --- a/backend/src/net/tor.rs +++ b/backend/src/net/tor.rs @@ -425,7 +425,7 @@ async fn test() { fn(AsyncEvent<'static>) -> BoxFuture<'static, Result<(), ConnError>>, > = conn.into_authenticated().await; let tor_key = torut::onion::TorSecretKeyV3::generate(); - dbg!(connection.get_conf("SocksPort").await.unwrap()); + connection.get_conf("SocksPort").await.unwrap(); connection .add_onion_v3( &tor_key, diff --git a/backend/src/procedure/docker.rs b/backend/src/procedure/docker.rs index 751b01def..e9460fc1b 100644 --- a/backend/src/procedure/docker.rs +++ b/backend/src/procedure/docker.rs @@ -41,6 +41,16 @@ lazy_static::lazy_static! { }; } +#[derive(Clone, Debug, Deserialize, Serialize, patch_db::HasModel)] +#[serde(rename_all = "kebab-case")] +pub struct DockerContainer { + pub image: ImageId, + #[serde(default)] + pub mounts: BTreeMap, + #[serde(default)] + pub shm_size_mb: Option, // TODO: use postfix sizing? like 1k vs 1m vs 1g +} + #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(rename_all = "kebab-case")] pub struct DockerProcedure { @@ -55,12 +65,40 @@ pub struct DockerProcedure { #[serde(default)] pub io_format: Option, #[serde(default)] - pub inject: bool, + pub sigterm_timeout: Option, #[serde(default)] pub shm_size_mb: Option, // TODO: use postfix sizing? like 1k vs 1m vs 1g +} + +#[derive(Clone, Debug, Deserialize, Serialize)] +#[serde(rename_all = "kebab-case")] +pub struct DockerInject { + #[serde(default)] + pub system: bool, + pub entrypoint: String, + #[serde(default)] + pub args: Vec, + #[serde(default)] + pub io_format: Option, #[serde(default)] pub sigterm_timeout: Option, } + +impl From<(&DockerContainer, &DockerInject)> for DockerProcedure { + fn from((container, injectable): (&DockerContainer, &DockerInject)) -> Self { + DockerProcedure { + image: container.image.clone(), + system: injectable.system.clone(), + entrypoint: injectable.entrypoint.clone(), + args: injectable.args.clone(), + mounts: container.mounts.clone(), + io_format: injectable.io_format.clone(), + sigterm_timeout: injectable.sigterm_timeout.clone(), + shm_size_mb: container.shm_size_mb.clone(), + } + } +} + impl DockerProcedure { pub fn validate( &self, @@ -86,12 +124,6 @@ impl DockerProcedure { if expected_io && self.io_format.is_none() { color_eyre::eyre::bail!("expected io-format"); } - if &**eos_version >= &emver::Version::new(0, 3, 1, 1) - && self.inject - && !self.mounts.is_empty() - { - color_eyre::eyre::bail!("mounts not allowed in inject actions"); - } Ok(()) } @@ -104,48 +136,196 @@ impl DockerProcedure { name: ProcedureName, volumes: &Volumes, input: Option, - allow_inject: bool, timeout: Option, ) -> Result, Error> { let name = name.docker_name(); let name: Option<&str> = name.as_ref().map(|x| &**x); let mut cmd = tokio::process::Command::new("docker"); - if self.inject && allow_inject { - cmd.arg("exec"); + tracing::debug!("{:?} is run", name); + let container_name = Self::container_name(pkg_id, name); + cmd.arg("run") + .arg("--rm") + .arg("--network=start9") + .arg(format!("--add-host=embassy:{}", Ipv4Addr::from(HOST_IP))) + .arg("--name") + .arg(&container_name) + .arg(format!("--hostname={}", &container_name)) + .arg("--no-healthcheck"); + match ctx + .docker + .remove_container( + &container_name, + Some(RemoveContainerOptions { + v: false, + force: true, + link: false, + }), + ) + .await + { + Ok(()) + | Err(bollard::errors::Error::DockerResponseServerError { + status_code: 404, // NOT FOUND + .. + }) => Ok(()), + Err(e) => Err(e), + }?; + cmd.args(self.docker_args(ctx, pkg_id, pkg_version, volumes).await?); + let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { + cmd.stdin(std::process::Stdio::piped()); + Some(format.to_vec(input)?) } else { - let container_name = Self::container_name(pkg_id, name); - cmd.arg("run") - .arg("--rm") - .arg("--network=start9") - .arg(format!("--add-host=embassy:{}", Ipv4Addr::from(HOST_IP))) - .arg("--name") - .arg(&container_name) - .arg(format!("--hostname={}", &container_name)) - .arg("--no-healthcheck"); - match ctx - .docker - .remove_container( - &container_name, - Some(RemoveContainerOptions { - v: false, - force: true, - link: false, - }), - ) - .await - { + None + }; + cmd.stdout(std::process::Stdio::piped()); + cmd.stderr(std::process::Stdio::piped()); + tracing::trace!( + "{}", + format!("{:?}", cmd) + .split(r#"" ""#) + .collect::>() + .join(" ") + ); + let mut handle = cmd.spawn().with_kind(crate::ErrorKind::Docker)?; + let id = handle.id(); + let timeout_fut = if let Some(timeout) = timeout { + EitherFuture::Right(async move { + tokio::time::sleep(timeout).await; + Ok(()) - | Err(bollard::errors::Error::DockerResponseServerError { - status_code: 404, // NOT FOUND - .. - }) => Ok(()), - Err(e) => Err(e), - }?; + }) + } else { + EitherFuture::Left(futures::future::pending::>()) + }; + if let (Some(input), Some(mut stdin)) = (&input_buf, handle.stdin.take()) { + use tokio::io::AsyncWriteExt; + stdin + .write_all(input) + .await + .with_kind(crate::ErrorKind::Docker)?; + stdin.flush().await?; + stdin.shutdown().await?; + drop(stdin); } - cmd.args( - self.docker_args(ctx, pkg_id, pkg_version, volumes, allow_inject) - .await, + enum Race { + Done(T), + TimedOut, + } + + let io_format = self.io_format; + let mut output = BufReader::new( + handle + .stdout + .take() + .ok_or_else(|| eyre!("Can't takeout stout")) + .with_kind(crate::ErrorKind::Docker)?, + ); + let output = NonDetachingJoinHandle::from(tokio::spawn(async move { + match async { + if let Some(format) = io_format { + return match max_by_lines(&mut output, None).await { + MaxByLines::Done(buffer) => { + Ok::( + match format.from_slice(buffer.as_bytes()) { + Ok(a) => a, + Err(e) => { + tracing::trace!( + "Failed to deserialize stdout from {}: {}, falling back to UTF-8 string.", + format, + e + ); + Value::String(buffer) + } + }, + ) + }, + MaxByLines::Error(e) => Err(e), + MaxByLines::Overflow(buffer) => Ok(Value::String(buffer)) + } + } + + let lines = buf_reader_to_lines(&mut output, 1000).await?; + if lines.is_empty() { + return Ok(Value::Null); + } + + let joined_output = lines.join("\n"); + Ok(Value::String(joined_output)) + }.await { + Ok(a) => Ok((a, output)), + Err(e) => Err((e, output)) + } + })); + let err_output = BufReader::new( + handle + .stderr + .take() + .ok_or_else(|| eyre!("Can't takeout std err")) + .with_kind(crate::ErrorKind::Docker)?, ); + + let err_output = NonDetachingJoinHandle::from(tokio::spawn(async move { + let lines = buf_reader_to_lines(err_output, 1000).await?; + let joined_output = lines.join("\n"); + Ok::<_, Error>(joined_output) + })); + + let res = tokio::select! { + res = handle.wait() => Race::Done(res.with_kind(crate::ErrorKind::Docker)?), + res = timeout_fut => { + res?; + Race::TimedOut + }, + }; + let exit_status = match res { + Race::Done(x) => x, + Race::TimedOut => { + if let Some(id) = id { + signal::kill(Pid::from_raw(id as i32), signal::SIGKILL) + .with_kind(crate::ErrorKind::Docker)?; + } + return Ok(Err((143, "Timed out. Retrying soon...".to_owned()))); + } + }; + Ok( + if exit_status.success() || exit_status.code() == Some(143) { + Ok(serde_json::from_value( + output + .await + .with_kind(crate::ErrorKind::Unknown)? + .map(|(v, _)| v) + .map_err(|(e, _)| tracing::warn!("{}", e)) + .unwrap_or_default(), + ) + .with_kind(crate::ErrorKind::Deserialization)?) + } else { + Err(( + exit_status.code().unwrap_or_default(), + err_output.await.with_kind(crate::ErrorKind::Unknown)??, + )) + }, + ) + } + + #[instrument(skip(ctx, input))] + pub async fn inject Deserialize<'de>>( + &self, + ctx: &RpcContext, + pkg_id: &PackageId, + pkg_version: &Version, + name: ProcedureName, + volumes: &Volumes, + input: Option, + timeout: Option, + ) -> Result, Error> { + let name = name.docker_name(); + let name: Option<&str> = name.as_ref().map(|x| &**x); + let mut cmd = tokio::process::Command::new("docker"); + + tracing::debug!("{:?} is exec", name); + cmd.arg("exec"); + + cmd.args(self.docker_args_inject(ctx, pkg_id, pkg_version).await?); let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { cmd.stdin(std::process::Stdio::piped()); Some(format.to_vec(input)?) @@ -295,8 +475,8 @@ impl DockerProcedure { let mut cmd = tokio::process::Command::new("docker"); cmd.arg("run").arg("--rm").arg("--network=none"); cmd.args( - self.docker_args(ctx, pkg_id, pkg_version, &volumes.to_readonly(), false) - .await, + self.docker_args(ctx, pkg_id, pkg_version, &volumes.to_readonly()) + .await?, ); let input_buf = if let (Some(input), Some(format)) = (&input, &self.io_format) { cmd.stdin(std::process::Stdio::piped()); @@ -418,14 +598,8 @@ impl DockerProcedure { pkg_id: &PackageId, pkg_version: &Version, volumes: &Volumes, - allow_inject: bool, - ) -> Vec> { - let mut res = Vec::with_capacity( - (2 * self.mounts.len()) // --mount - + (2 * self.shm_size_mb.is_some() as usize) // --shm-size - + 5 // --interactive --log-driver=journald --entrypoint - + self.args.len(), // [ARG...] - ); + ) -> Result>, Error> { + let mut res = self.new_docker_args(); for (volume_id, dst) in &self.mounts { let volume = if let Some(v) = volumes.get(volume_id) { v @@ -434,8 +608,7 @@ impl DockerProcedure { }; let src = volume.path_for(&ctx.datadir, pkg_id, pkg_version, volume_id); if let Err(e) = tokio::fs::metadata(&src).await { - tracing::warn!("{} not mounted to container: {}", src.display(), e); - continue; + tokio::fs::create_dir_all(&src).await?; } res.push(OsStr::new("--mount").into()); res.push( @@ -453,22 +626,48 @@ impl DockerProcedure { res.push(OsString::from(format!("{}m", shm_size_mb)).into()); } res.push(OsStr::new("--interactive").into()); - if self.inject && allow_inject { - res.push(OsString::from(Self::container_name(pkg_id, None)).into()); - res.push(OsStr::new(&self.entrypoint).into()); + + res.push(OsStr::new("--log-driver=journald").into()); + res.push(OsStr::new("--entrypoint").into()); + res.push(OsStr::new(&self.entrypoint).into()); + if self.system { + res.push(OsString::from(self.image.for_package(SYSTEM_PACKAGE_ID, None)).into()); } else { - res.push(OsStr::new("--log-driver=journald").into()); - res.push(OsStr::new("--entrypoint").into()); - res.push(OsStr::new(&self.entrypoint).into()); - if self.system { - res.push(OsString::from(self.image.for_package(SYSTEM_PACKAGE_ID, None)).into()); - } else { - res.push(OsString::from(self.image.for_package(pkg_id, Some(pkg_version))).into()); - } + res.push(OsString::from(self.image.for_package(pkg_id, Some(pkg_version))).into()); + } + + res.extend(self.args.iter().map(|s| OsStr::new(s).into())); + + Ok(res) + } + + fn new_docker_args(&self) -> Vec> { + Vec::with_capacity( + (2 * self.mounts.len()) // --mount + + (2 * self.shm_size_mb.is_some() as usize) // --shm-size + + 5 // --interactive --log-driver=journald --entrypoint + + self.args.len(), // [ARG...] + ) + } + async fn docker_args_inject( + &self, + ctx: &RpcContext, + pkg_id: &PackageId, + pkg_version: &Version, + ) -> Result>, Error> { + let mut res = self.new_docker_args(); + if let Some(shm_size_mb) = self.shm_size_mb { + res.push(OsStr::new("--shm-size").into()); + res.push(OsString::from(format!("{}m", shm_size_mb)).into()); } + res.push(OsStr::new("--interactive").into()); + + res.push(OsString::from(Self::container_name(pkg_id, None)).into()); + res.push(OsStr::new(&self.entrypoint).into()); + res.extend(self.args.iter().map(|s| OsStr::new(s).into())); - res + Ok(res) } } diff --git a/backend/src/procedure/mod.rs b/backend/src/procedure/mod.rs index 279fbd910..bbdfaedb2 100644 --- a/backend/src/procedure/mod.rs +++ b/backend/src/procedure/mod.rs @@ -1,11 +1,12 @@ use std::collections::BTreeSet; use std::time::Duration; +use color_eyre::eyre::{bail, eyre}; use patch_db::HasModel; use serde::{Deserialize, Serialize}; use tracing::instrument; -use self::docker::DockerProcedure; +use self::docker::{DockerContainer, DockerInject, DockerProcedure}; use crate::context::RpcContext; use crate::id::ImageId; use crate::s9pk::manifest::PackageId; @@ -25,10 +26,12 @@ pub use models::ProcedureName; #[serde(tag = "type")] pub enum PackageProcedure { Docker(DockerProcedure), + DockerInject(DockerInject), #[cfg(feature = "js_engine")] Script(js_scripts::JsProcedure), } + impl PackageProcedure { pub fn is_script(&self) -> bool { match self { @@ -40,6 +43,7 @@ impl PackageProcedure { #[instrument] pub fn validate( &self, + container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, @@ -49,40 +53,95 @@ impl PackageProcedure { PackageProcedure::Docker(action) => { action.validate(eos_version, volumes, image_ids, expected_io) } + PackageProcedure::DockerInject(injectable) => { + let container = match container { + None => bail!("For the docker injectable procedure, a container must be exist on the config"), + Some(container) => container, + } ; + let docker_procedure: DockerProcedure = (container, injectable).into(); + docker_procedure.validate(eos_version, volumes, image_ids, expected_io) + } #[cfg(feature = "js_engine")] PackageProcedure::Script(action) => action.validate(volumes), } } - #[instrument(skip(ctx, input))] + #[instrument(skip(ctx, input, container))] pub async fn execute Deserialize<'de>>( &self, ctx: &RpcContext, + container: &Option, pkg_id: &PackageId, pkg_version: &Version, name: ProcedureName, volumes: &Volumes, input: Option, - allow_inject: bool, timeout: Option, ) -> Result, Error> { tracing::trace!("Procedure execute {} {} - {:?}", self, pkg_id, name); match self { PackageProcedure::Docker(procedure) => { + procedure + .execute(ctx, pkg_id, pkg_version, name, volumes, input, timeout) + .await + } + PackageProcedure::DockerInject(injectable) => { + let container = match container { + None => return Err(Error::new(eyre!("For the docker injectable procedure, a container must be exist on the config"), crate::ErrorKind::Action)), + Some(container) => container, + } ; + let docker_procedure: DockerProcedure = (container, injectable).into(); + docker_procedure + .inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout) + .await + } + #[cfg(feature = "js_engine")] + PackageProcedure::Script(procedure) => { procedure .execute( - ctx, + &ctx.datadir, pkg_id, pkg_version, name, volumes, input, - allow_inject, timeout, ) .await } + } + } + + #[instrument(skip(ctx, input, container))] + pub async fn inject Deserialize<'de>>( + &self, + ctx: &RpcContext, + container: &Option, + pkg_id: &PackageId, + pkg_version: &Version, + name: ProcedureName, + volumes: &Volumes, + input: Option, + timeout: Option, + ) -> Result, Error> { + tracing::trace!("Procedure inject {} {} - {:?}", self, pkg_id, name); + match self { + PackageProcedure::Docker(procedure) => { + procedure + .inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout) + .await + } + PackageProcedure::DockerInject(injectable) => { + let container = match container { + None => return Err(Error::new(eyre!("For the docker injectable procedure, a container must be exist on the config"), crate::ErrorKind::Action)), + Some(container) => container, + } ; + let docker_procedure: DockerProcedure = (container, injectable).into(); + docker_procedure + .inject(ctx, pkg_id, pkg_version, name, volumes, input, timeout) + .await + } #[cfg(feature = "js_engine")] PackageProcedure::Script(procedure) => { procedure @@ -102,6 +161,7 @@ impl PackageProcedure { #[instrument(skip(ctx, input))] pub async fn sandboxed Deserialize<'de>>( &self, + container: &Option, ctx: &RpcContext, pkg_id: &PackageId, pkg_version: &Version, @@ -117,6 +177,16 @@ impl PackageProcedure { .sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout) .await } + PackageProcedure::DockerInject(injectable) => { + let container = match container { + None => return Err(Error::new(eyre!("For the docker injectable procedure, a container must be exist on the config"), crate::ErrorKind::Action)), + Some(container) => container, + } ; + let docker_procedure: DockerProcedure = (container, injectable).into(); + docker_procedure + .sandboxed(ctx, pkg_id, pkg_version, volumes, input, timeout) + .await + } #[cfg(feature = "js_engine")] PackageProcedure::Script(procedure) => { procedure @@ -130,6 +200,7 @@ impl PackageProcedure { impl std::fmt::Display for PackageProcedure { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { + PackageProcedure::DockerInject(_) => write!(f, "Docker Injectable")?, PackageProcedure::Docker(_) => write!(f, "Docker")?, #[cfg(feature = "js_engine")] PackageProcedure::Script(_) => write!(f, "JS")?, diff --git a/backend/src/properties.rs b/backend/src/properties.rs index 0a70e6e41..72a977063 100644 --- a/backend/src/properties.rs +++ b/backend/src/properties.rs @@ -34,12 +34,12 @@ pub async fn fetch_properties(ctx: RpcContext, id: PackageId) -> Result( &ctx, + &manifest.container, &manifest.id, &manifest.version, ProcedureName::Properties, &manifest.volumes, None, - false, None, ) .await? diff --git a/backend/src/s9pk/manifest.rs b/backend/src/s9pk/manifest.rs index 217bdca61..3abae432c 100644 --- a/backend/src/s9pk/manifest.rs +++ b/backend/src/s9pk/manifest.rs @@ -6,7 +6,6 @@ use patch_db::HasModel; use serde::{Deserialize, Serialize}; use url::Url; -use crate::action::Actions; use crate::backup::BackupActions; use crate::config::action::ConfigActions; use crate::dependencies::Dependencies; @@ -18,6 +17,7 @@ use crate::util::Version; use crate::version::{Current, VersionT}; use crate::volume::Volumes; use crate::Error; +use crate::{action::Actions, procedure::docker::DockerContainer}; fn current_version() -> Version { Current::new().semver().into() @@ -70,6 +70,8 @@ pub struct Manifest { #[serde(default)] #[model] pub dependencies: Dependencies, + #[model] + pub container: Option, } impl Manifest { diff --git a/backend/src/s9pk/mod.rs b/backend/src/s9pk/mod.rs index 177053218..a0dadb161 100644 --- a/backend/src/s9pk/mod.rs +++ b/backend/src/s9pk/mod.rs @@ -2,17 +2,18 @@ use std::path::PathBuf; use color_eyre::eyre::eyre; use imbl::OrdMap; +use patch_db::{LockReceipt, LockType}; use rpc_toolkit::command; use serde_json::Value; use tracing::instrument; -use crate::context::SdkContext; use crate::s9pk::builder::S9pkPacker; use crate::s9pk::manifest::Manifest; use crate::s9pk::reader::S9pkReader; use crate::util::display_none; use crate::util::serde::IoFormat; use crate::volume::Volume; +use crate::{context::SdkContext, procedure::docker::DockerContainer}; use crate::{Error, ErrorKind, ResultExt}; pub mod builder; diff --git a/backend/src/s9pk/reader.rs b/backend/src/s9pk/reader.rs index d69e7b20b..a1591deec 100644 --- a/backend/src/s9pk/reader.rs +++ b/backend/src/s9pk/reader.rs @@ -17,9 +17,9 @@ use tracing::instrument; use super::header::{FileSection, Header, TableOfContents}; use super::manifest::{Manifest, PackageId}; use super::SIG_CONTEXT; -use crate::id::ImageId; use crate::install::progress::InstallProgressTracker; use crate::util::Version; +use crate::{id::ImageId, procedure::docker::DockerContainer}; use crate::{Error, ResultExt}; #[pin_project::pin_project] @@ -145,6 +145,7 @@ impl S9pkReader { } let image_tags = self.image_tags().await?; let man = self.manifest().await?; + let container = &man.container; let validated_image_ids = image_tags .into_iter() .map(|i| i.validate(&man.id, &man.version).map(|_| i.image_id)) @@ -154,25 +155,59 @@ impl S9pkReader { .0 .iter() .map(|(_, action)| { - action.validate(&man.eos_version, &man.volumes, &validated_image_ids) + action.validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + ) }) .collect::>()?; - man.backup - .validate(&man.eos_version, &man.volumes, &validated_image_ids)?; + man.backup.validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + )?; if let Some(cfg) = &man.config { - cfg.validate(&man.eos_version, &man.volumes, &validated_image_ids)?; + cfg.validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + )?; } - man.health_checks - .validate(&man.eos_version, &man.volumes, &validated_image_ids)?; + man.health_checks.validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + )?; man.interfaces.validate()?; man.main - .validate(&man.eos_version, &man.volumes, &validated_image_ids, false) + .validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + false, + ) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Main"))?; - man.migrations - .validate(&man.eos_version, &man.volumes, &validated_image_ids)?; + man.migrations.validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + )?; if let Some(props) = &man.properties { props - .validate(&man.eos_version, &man.volumes, &validated_image_ids, true) + .validate( + container, + &man.eos_version, + &man.volumes, + &validated_image_ids, + true, + ) .with_ctx(|_| (crate::ErrorKind::ValidateS9pk, "Properties"))?; } man.volumes.validate(&man.interfaces)?; diff --git a/backend/src/status/health_check.rs b/backend/src/status/health_check.rs index 246e7a18a..2cb0fad4b 100644 --- a/backend/src/status/health_check.rs +++ b/backend/src/status/health_check.rs @@ -5,13 +5,13 @@ pub use models::HealthCheckId; use serde::{Deserialize, Serialize}; use tracing::instrument; -use crate::context::RpcContext; use crate::id::ImageId; use crate::procedure::{NoOutput, PackageProcedure, ProcedureName}; use crate::s9pk::manifest::PackageId; use crate::util::serde::Duration; use crate::util::Version; use crate::volume::Volumes; +use crate::{context::RpcContext, procedure::docker::DockerContainer}; use crate::{Error, ResultExt}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -20,6 +20,7 @@ impl HealthChecks { #[instrument] pub fn validate( &self, + container: &Option, eos_version: &Version, volumes: &Volumes, image_ids: &BTreeSet, @@ -27,7 +28,7 @@ impl HealthChecks { for (_, check) in &self.0 { check .implementation - .validate(eos_version, &volumes, image_ids, false) + .validate(container, eos_version, &volumes, image_ids, false) .with_ctx(|_| { ( crate::ErrorKind::ValidateS9pk, @@ -40,6 +41,7 @@ impl HealthChecks { pub async fn check_all( &self, ctx: &RpcContext, + container: &Option, started: DateTime, pkg_id: &PackageId, pkg_version: &Version, @@ -49,7 +51,7 @@ impl HealthChecks { Ok::<_, Error>(( id.clone(), check - .check(ctx, id, started, pkg_id, pkg_version, volumes) + .check(ctx, container, id, started, pkg_id, pkg_version, volumes) .await?, )) })) @@ -72,6 +74,7 @@ impl HealthCheck { pub async fn check( &self, ctx: &RpcContext, + container: &Option, id: &HealthCheckId, started: DateTime, pkg_id: &PackageId, @@ -82,12 +85,12 @@ impl HealthCheck { .implementation .execute( ctx, + container, pkg_id, pkg_version, ProcedureName::Health(id.clone()), volumes, Some(Utc::now().signed_duration_since(started).num_milliseconds()), - true, Some( self.timeout .map_or(std::time::Duration::from_secs(30), |d| *d), diff --git a/backend/src/volume.rs b/backend/src/volume.rs index 8339e1564..1e0a1fa46 100644 --- a/backend/src/volume.rs +++ b/backend/src/volume.rs @@ -189,6 +189,29 @@ impl Volume { Volume::Backup { .. } => backup_dir(pkg_id), } } + + pub fn pointer_path(&self, data_dir_path: impl AsRef) -> Option { + if let Volume::Pointer { + path, + package_id, + volume_id, + .. + } = self + { + Some( + data_dir(data_dir_path.as_ref(), package_id, volume_id).join( + if path.is_absolute() { + path.strip_prefix("/").unwrap() + } else { + path.as_ref() + }, + ), + ) + } else { + None + } + } + pub fn set_readonly(&mut self) { match self { Volume::Data { readonly } => { diff --git a/libs/js_engine/src/lib.rs b/libs/js_engine/src/lib.rs index eeaa6619c..6f5810d0c 100644 --- a/libs/js_engine/src/lib.rs +++ b/libs/js_engine/src/lib.rs @@ -39,6 +39,7 @@ pub enum JsError { FileSystem, Code(i32), Timeout, + NotValidProcedureName, } impl JsError { @@ -50,6 +51,7 @@ impl JsError { JsError::BoundryLayerSerDe => 4, JsError::Tokio => 5, JsError::FileSystem => 6, + JsError::NotValidProcedureName => 7, JsError::Code(code) => *code, JsError::Timeout => 143, } @@ -287,7 +289,15 @@ impl JsExecutionEnvironment { let ext_answer_state = answer_state.clone(); let js_ctx = JsContext { datadir: base_directory, - run_function: procedure_name.js_function_name(), + run_function: procedure_name + .js_function_name() + .map(Ok) + .unwrap_or_else(|| { + Err(( + JsError::NotValidProcedureName, + format!("procedure is not value: {:?}", procedure_name), + )) + })?, package_id: self.package_id.clone(), volumes: self.volumes.clone(), version: self.version.clone(), diff --git a/libs/models/src/procedure_name.rs b/libs/models/src/procedure_name.rs index 10b490402..ebe3b6ba1 100644 --- a/libs/models/src/procedure_name.rs +++ b/libs/models/src/procedure_name.rs @@ -9,6 +9,7 @@ pub enum ProcedureName { SetConfig, Migration, Properties, + LongRunning, Check(PackageId), AutoConfig(PackageId), Health(HealthCheckId), @@ -19,6 +20,7 @@ impl ProcedureName { pub fn docker_name(&self) -> Option { match self { ProcedureName::Main => None, + ProcedureName::LongRunning => None, ProcedureName::CreateBackup => Some("CreateBackup".to_string()), ProcedureName::RestoreBackup => Some("RestoreBackup".to_string()), ProcedureName::GetConfig => Some("GetConfig".to_string()), @@ -31,19 +33,20 @@ impl ProcedureName { ProcedureName::AutoConfig(_) => None, } } - pub fn js_function_name(&self) -> String { + pub fn js_function_name(&self) -> Option { match self { - ProcedureName::Main => "/main".to_string(), - ProcedureName::CreateBackup => "/createBackup".to_string(), - ProcedureName::RestoreBackup => "/restoreBackup".to_string(), - ProcedureName::GetConfig => "/getConfig".to_string(), - ProcedureName::SetConfig => "/setConfig".to_string(), - ProcedureName::Migration => "/migration".to_string(), - ProcedureName::Properties => "/properties".to_string(), - ProcedureName::Health(id) => format!("/health/{}", id), - ProcedureName::Action(id) => format!("/action/{}", id), - ProcedureName::Check(id) => format!("/dependencies/{}/check", id), - ProcedureName::AutoConfig(id) => format!("/dependencies/{}/autoConfigure", id), + ProcedureName::Main => None, + ProcedureName::LongRunning => None, + ProcedureName::CreateBackup => Some("/createBackup".to_string()), + ProcedureName::RestoreBackup => Some("/restoreBackup".to_string()), + ProcedureName::GetConfig => Some("/getConfig".to_string()), + ProcedureName::SetConfig => Some("/setConfig".to_string()), + ProcedureName::Migration => Some("/migration".to_string()), + ProcedureName::Properties => Some("/properties".to_string()), + ProcedureName::Health(id) => Some(format!("/health/{}", id)), + ProcedureName::Action(id) => Some(format!("/action/{}", id)), + ProcedureName::Check(id) => Some(format!("/dependencies/{}/check", id)), + ProcedureName::AutoConfig(id) => Some(format!("/dependencies/{}/autoConfigure", id)), } } }