Skip to content

Commit

Permalink
feat: support conda and rsync
Browse files Browse the repository at this point in the history
  • Loading branch information
skyzh committed Mar 4, 2021
1 parent 34eb33f commit 357a6e2
Show file tree
Hide file tree
Showing 14 changed files with 519 additions and 91 deletions.
34 changes: 34 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,4 @@ rusoto_s3 = "0.46.0"
iter-set = "2.0"
tokio-util = "0.6"
url = "2.2"
serde_yaml = "0.8"
124 changes: 124 additions & 0 deletions src/conda.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
use crate::common::{Mission, SnapshotConfig, SnapshotPath, TransferURL};
use crate::error::{Error, Result};
use crate::traits::{SnapshotStorage, SourceStorage};

use async_trait::async_trait;
use futures_util::{stream, StreamExt, TryStreamExt};
use serde::Deserialize;
use serde_json::Value as JsonValue;
use slog::{info, warn};
use structopt::StructOpt;

#[derive(Debug, StructOpt)]
pub struct CondaConfig {
pub repo_config: String,
}

#[derive(Deserialize)]
pub struct CondaRepos {
pub base: String,
pub repos: Vec<String>,
}

pub struct Conda {
config: CondaConfig,
repos: CondaRepos,
}

fn parse_index(data: &[u8]) -> Result<Vec<String>> {
let v: JsonValue = serde_json::from_slice(data)?;
let mut result = vec![];

let package_mapper = |(key, _value): (&String, &JsonValue)| key.clone();

if let Some(JsonValue::Object(map)) = v.get("packages") {
result.append(&mut map.iter().map(package_mapper).collect());
}
if let Some(JsonValue::Object(map)) = v.get("packages.conda") {
result.append(&mut map.iter().map(package_mapper).collect());
}

Ok(result)
}

impl Conda {
pub fn new(config: CondaConfig) -> Self {
let content = std::fs::read(&config.repo_config).unwrap();
let repos = serde_yaml::from_str(std::str::from_utf8(&content).unwrap()).unwrap();
Self { config, repos }
}
}

impl std::fmt::Debug for Conda {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.config.fmt(f)
}
}

#[async_trait]
impl SnapshotStorage<SnapshotPath> for Conda {
async fn snapshot(
&mut self,
mission: Mission,
_config: &SnapshotConfig,
) -> Result<Vec<SnapshotPath>> {
let logger = mission.logger;
let progress = mission.progress;
let client = mission.client;

let fetch = |repo: String| {
info!(logger, "fetching {}", repo);
let progress = progress.clone();
let base = self.repos.base.clone();
let client = client.clone();
let logger = logger.clone();
let repo_ = repo.clone();

let future = async move {
let mut snapshot = vec![];
let repodata = format!("{}/{}/repodata.json", base, repo);
let index_data = client.get(&repodata).send().await?.bytes().await?;
let packages = parse_index(&index_data)?;
snapshot.extend(packages.into_iter().map(|pkg| format!("{}/{}", repo, pkg)));
progress.set_message(&repo);
snapshot.append(&mut vec![
"repodata.json".to_string(),
"repodata.json.bz2".to_string(),
"current_repodata.json".to_string(),
]);
Ok::<_, Error>(snapshot)
};

async move {
let result = future.await;
if let Err(err) = result.as_ref() {
warn!(logger, "failed to fetch {}: {:?}", repo_, err);
}
result
}
};

let snapshots = stream::iter(self.repos.repos.clone())
.map(fetch)
.buffer_unordered(4)
.try_collect::<Vec<_>>()
.await?
.into_iter()
.flatten()
.map(SnapshotPath)
.collect::<Vec<_>>();

Ok(snapshots)
}

fn info(&self) -> String {
format!("conda, {:?}", self.config)
}
}

#[async_trait]
impl SourceStorage<SnapshotPath, TransferURL> for Conda {
async fn get_object(&self, snapshot: &SnapshotPath, _mission: &Mission) -> Result<TransferURL> {
Ok(TransferURL(format!("{}/{}", self.repos.base, snapshot.0)))
}
}
4 changes: 4 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@ pub enum Error {
HTTPError(reqwest::StatusCode),
#[error("Pipe Error {0}")]
PipeError(String),
#[error("Json Decode Error {0}")]
JsonDecodeError(#[from] serde_json::Error),
#[error("Datetime Parse Error {0}")]
DatetimeParseError(#[from] chrono::ParseError),
}

impl<T: std::fmt::Debug> From<rusoto_core::RusotoError<T>> for Error {
Expand Down
33 changes: 26 additions & 7 deletions src/file_backend.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use crate::common::{Mission, SnapshotConfig, SnapshotPath};
use crate::error::{Error, Result};
use crate::stream_pipe::ByteStream;
use crate::traits::{SnapshotStorage, TargetStorage};
use crate::traits::{Key, SnapshotStorage, TargetStorage};
use crate::{
common::{Mission, SnapshotConfig, SnapshotPath},
metadata::SnapshotMeta,
opts::Target,
};

use async_trait::async_trait;
use slog::info;
Expand Down Expand Up @@ -60,24 +64,39 @@ impl SnapshotStorage<SnapshotPath> for FileBackend {
}

#[async_trait]
impl TargetStorage<SnapshotPath, ByteStream> for FileBackend {
impl<Snapshot: Key> TargetStorage<Snapshot, ByteStream> for FileBackend {
async fn put_object(
&self,
snapshot: &SnapshotPath,
snapshot: &Snapshot,
byte_stream: ByteStream,
_mission: &Mission,
) -> Result<()> {
let path = byte_stream.object.use_file();
let target: std::path::PathBuf = format!("{}/{}", self.base_path, snapshot.0).into();
let target: std::path::PathBuf = format!("{}/{}", self.base_path, snapshot.key()).into();
let parent = target.parent().unwrap();
tokio::fs::create_dir_all(parent).await?;
tokio::fs::rename(path, target).await?;
Ok(())
}

async fn delete_object(&self, snapshot: &SnapshotPath, _mission: &Mission) -> Result<()> {
let target = format!("{}/{}", self.base_path, snapshot.0);
async fn delete_object(&self, snapshot: &Snapshot, _mission: &Mission) -> Result<()> {
let target = format!("{}/{}", self.base_path, snapshot.key());
tokio::fs::remove_file(target).await?;
Ok(())
}
}

#[async_trait]
impl SnapshotStorage<SnapshotMeta> for FileBackend {
async fn snapshot(
&mut self,
_mission: Mission,
_config: &SnapshotConfig,
) -> Result<Vec<SnapshotMeta>> {
panic!("not supported");
}

fn info(&self) -> String {
format!("file, {:?}", self)
}
}
9 changes: 9 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
mod common;
mod conda;
mod crates_io;
mod dart;
mod error;
Expand All @@ -7,6 +8,7 @@ mod github_release;
mod gradle;
mod homebrew;
mod html_scanner;
mod metadata;
mod mirror_intel;
mod opts;
mod pypi;
Expand Down Expand Up @@ -92,6 +94,13 @@ fn main() {
Source::CratesIo(source) => {
transfer!(opts, source, transfer_config);
}
Source::Conda(config) => {
let source = conda::Conda::new(config);
transfer!(opts, source, transfer_config);
}
Source::Rsync(source) => {
transfer!(opts, source, transfer_config);
}
}
});
}
70 changes: 70 additions & 0 deletions src/metadata.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use async_trait::async_trait;

use crate::common::{Mission, SnapshotConfig, SnapshotPath};
use crate::error::Result;
use crate::traits::{Diff, Key, SnapshotStorage};

#[derive(Clone, Debug)]
pub struct SnapshotMeta {
pub key: String,
pub size: Option<u64>,
pub last_modified: Option<u64>,
pub checksum_method: Option<String>,
pub checksum: Option<String>,
}

pub struct MetaAsPath<Source: SnapshotStorage<SnapshotMeta> + std::fmt::Debug + std::marker::Send> {
source: Source,
}

#[async_trait]
impl<Source: SnapshotStorage<SnapshotMeta> + std::fmt::Debug + std::marker::Send>
SnapshotStorage<SnapshotPath> for MetaAsPath<Source>
{
async fn snapshot(
&mut self,
mission: Mission,
config: &SnapshotConfig,
) -> Result<Vec<SnapshotPath>> {
let snapshot = self.source.snapshot(mission, config).await?;
Ok(snapshot
.into_iter()
.map(|item| SnapshotPath(item.key))
.collect())
}

fn info(&self) -> String {
format!("as snapshot path, {:?}", self.source)
}
}

impl Key for SnapshotMeta {
fn key(&self) -> &str {
&self.key
}
}

fn compare_option<T: Eq>(a: &Option<T>, b: &Option<T>) -> bool {
match (a, b) {
(Some(a), Some(b)) => a == b,
_ => true,
}
}

impl Diff for SnapshotMeta {
fn diff(&self, other: &Self) -> bool {
if !compare_option(&self.size, &other.size) {
return true;
}
if !compare_option(&self.last_modified, &other.last_modified) {
return true;
}
if !compare_option(&self.checksum_method, &other.checksum_method) {
return true;
}
if !compare_option(&self.checksum, &other.checksum) {
return true;
}
false
}
}
Loading

0 comments on commit 357a6e2

Please sign in to comment.