Skip to content

Commit

Permalink
feat(topolotree): new implementation and Hydro Deploy setup (#909)
Browse files Browse the repository at this point in the history
--

Co-authored-by: Saikrishna Achalla <saikrishnaachalla@Saikrishnas-MacBook-Pro.local>
Co-authored-by: zzlk <2418897+zzlk@users.noreply.github.com>
  • Loading branch information
3 people authored Dec 1, 2023
1 parent bc35a5a commit 6158a7a
Show file tree
Hide file tree
Showing 33 changed files with 1,477 additions and 1,019 deletions.
24 changes: 20 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ members = [
"stageleft_test",
"stageleft_test_macro",
"stageleft_tool",
"topolotree",
"variadics",
"website_playground",
]
Expand Down
2 changes: 1 addition & 1 deletion hydro_cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ nanoid = "0.4.0"
ctrlc = "3.2.5"
nix = "0.26.2"
hydroflow_cli_integration = { path = "../hydroflow_cli_integration", version = "^0.3.0" }
indicatif = "0.17.3"
indicatif = "0.17.6"
cargo_metadata = "0.15.4"

[dev-dependencies]
2 changes: 1 addition & 1 deletion hydro_cli/hydro/_core.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class Deployment(object):

def CustomService(self, on: "Host", external_ports: List[int]) -> "CustomService": ...

def HydroflowCrate(self, src: str, on: "Host", example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ...
def HydroflowCrate(self, src: str, on: "Host", bin: Optional[str] = None, example: Optional[str] = None, profile: Optional[str] = None, features: Optional[List[str]] = None, args: Optional[List[str]] = None, display_id: Optional[str] = None, external_ports: Optional[List[int]] = None) -> "HydroflowCrate": ...

async def deploy(self): ...

Expand Down
4 changes: 3 additions & 1 deletion hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ impl Service for CustomService {
Ok(())
}

async fn start(&mut self) {}
async fn start(&mut self) -> Result<()> {
Ok(())
}

async fn stop(&mut self) -> Result<()> {
Ok(())
Expand Down
18 changes: 10 additions & 8 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub struct Deployment {

impl Deployment {
pub async fn deploy(&mut self) -> Result<()> {
progress::ProgressTracker::with_group("deploy", || async {
progress::ProgressTracker::with_group("deploy", None, || async {
let mut resource_batch = super::ResourceBatch::new();
let active_services = self
.services
Expand All @@ -41,7 +41,7 @@ impl Deployment {
}

let result = Arc::new(
progress::ProgressTracker::with_group("provision", || async {
progress::ProgressTracker::with_group("provision", None, || async {
resource_batch
.provision(&mut self.resource_pool, self.last_resource_result.clone())
.await
Expand All @@ -50,7 +50,7 @@ impl Deployment {
);
self.last_resource_result = Some(result.clone());

progress::ProgressTracker::with_group("provision", || {
progress::ProgressTracker::with_group("provision", None, || {
let hosts_provisioned =
self.hosts
.iter_mut()
Expand All @@ -61,7 +61,7 @@ impl Deployment {
})
.await;

progress::ProgressTracker::with_group("deploy", || {
progress::ProgressTracker::with_group("deploy", None, || {
let services_future =
self.services
.iter_mut()
Expand All @@ -79,7 +79,7 @@ impl Deployment {
})
.await;

progress::ProgressTracker::with_group("ready", || {
progress::ProgressTracker::with_group("ready", None, || {
let all_services_ready =
self.services
.iter()
Expand All @@ -97,7 +97,7 @@ impl Deployment {
.await
}

pub async fn start(&mut self) {
pub async fn start(&mut self) -> Result<()> {
let active_services = self
.services
.iter()
Expand All @@ -110,10 +110,12 @@ impl Deployment {
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await;
service.upgrade().unwrap().write().await.start().await?;
Ok(()) as Result<()>
});

futures::future::join_all(all_services_start).await;
futures::future::try_join_all(all_services_start).await?;
Ok(())
}

pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
Expand Down
67 changes: 38 additions & 29 deletions hydro_cli/src/core/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,35 +92,44 @@ impl LaunchedSSHHost for LaunchedComputeEngine {
22,
);

let res = ProgressTracker::leaf(
format!(
"connecting to host @ {}",
self.external_ip.as_ref().unwrap()
),
async_retry(
&|| async {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

session.handshake().await?;

session
.userauth_pubkey_file(
self.user.as_str(),
None,
self.ssh_key_path().as_path(),
None,
)
.await?;

Ok(session)
},
10,
Duration::from_secs(1),
),
let mut attempt_count = 0;

let res = async_retry(
|| {
attempt_count += 1;
ProgressTracker::leaf(
format!(
"connecting to host @ {} (attempt: {})",
self.external_ip.as_ref().unwrap(),
attempt_count
),
async {
let mut config = SessionConfiguration::new();
config.set_compress(true);

let mut session =
AsyncSession::<TcpStream>::connect(target_addr, Some(config)).await?;

tokio::time::timeout(Duration::from_secs(15), async move {
session.handshake().await?;

session
.userauth_pubkey_file(
self.user.as_str(),
None,
self.ssh_key_path().as_path(),
None,
)
.await?;

Ok(session)
})
.await?
},
)
},
10,
Duration::from_secs(1),
)
.await?;

Expand Down
35 changes: 22 additions & 13 deletions hydro_cli/src/core/hydroflow_crate/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@ use tokio::sync::OnceCell;
use crate::core::progress::ProgressTracker;
use crate::core::HostTargetType;

type CacheKey = (
PathBuf,
Option<String>,
Option<String>,
HostTargetType,
Option<Vec<String>>,
);
#[derive(PartialEq, Eq, Hash)]
struct CacheKey {
src: PathBuf,
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
target_type: HostTargetType,
features: Option<Vec<String>>,
}

pub type BuiltCrate = Arc<(String, Vec<u8>, PathBuf)>;

Expand All @@ -27,18 +29,21 @@ static BUILDS: Lazy<Mutex<HashMap<CacheKey, Arc<OnceCell<BuiltCrate>>>>> =

pub async fn build_crate(
src: PathBuf,
bin: Option<String>,
example: Option<String>,
profile: Option<String>,
target_type: HostTargetType,
features: Option<Vec<String>>,
) -> Result<BuiltCrate> {
let key = (
src.clone(),
example.clone(),
profile.clone(),
let key = CacheKey {
src: src.clone(),
bin: bin.clone(),
example: example.clone(),
profile: profile.clone(),
target_type,
features.clone(),
);
features: features.clone(),
};

let unit_of_work = {
let mut builds = BUILDS.lock().unwrap();
builds.entry(key).or_default().clone()
Expand All @@ -56,6 +61,10 @@ pub async fn build_crate(
profile.unwrap_or("release".to_string()),
]);

if let Some(bin) = bin.as_ref() {
command.args(["--bin", bin]);
}

if let Some(example) = example.as_ref() {
command.args(["--example", example]);
}
Expand Down
Loading

0 comments on commit 6158a7a

Please sign in to comment.