Skip to content

Commit

Permalink
feat(worker): add option for worker lifetime
Browse files Browse the repository at this point in the history
Resolves: #25
  • Loading branch information
BobAnkh committed Oct 14, 2024
1 parent c39e24a commit bb30eab
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 10 deletions.
1 change: 1 addition & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ access_token_expires_in = "7d"
coordinator_addr = "http://127.0.0.1:5000"
polling_interval = "3m"
heartbeat_interval = "5m"
lifetime = "7d"
# credential_path is not set
# user is not set
# password is not set
Expand Down
19 changes: 10 additions & 9 deletions netmito/src/api/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::{
error::{ApiError, ApiResult, Error},
schema::*,
service::{
auth::{token::generate_token, worker_auth_middleware, AuthUser, AuthWorker},
auth::{token::generate_worker_token, worker_auth_middleware, AuthUser, AuthWorker},
s3::{get_artifact, get_attachment},
task::get_task,
worker,
Expand Down Expand Up @@ -53,14 +53,15 @@ pub async fn register(
ApiError::InternalServerError
}
})?;
let token = generate_token(uuid.clone().to_string(), 0).map_err(|e| match e {
crate::error::Error::AuthError(err) => ApiError::AuthError(err),
crate::error::Error::ApiError(e) => e,
_ => {
tracing::error!("{}", e);
ApiError::InternalServerError
}
})?;
let token =
generate_worker_token(uuid.clone().to_string(), 0, req.lifetime).map_err(|e| match e {
crate::error::Error::AuthError(err) => ApiError::AuthError(err),
crate::error::Error::ApiError(e) => e,
_ => {
tracing::error!("{}", e);
ApiError::InternalServerError
}
})?;
let redis_url = REDIS_CONNECTION_INFO.get().map(|info| info.worker_url());
Ok(Json(RegisterWorkerResp {
worker_id: uuid,
Expand Down
7 changes: 7 additions & 0 deletions netmito/src/config/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub struct WorkerConfig {
pub(crate) tags: HashSet<String>,
pub(crate) log_path: Option<RelativePathBuf>,
pub(crate) file_log: bool,
#[serde(with = "humantime_serde")]
pub(crate) lifetime: Option<Duration>,
}

#[derive(Args, Debug, Serialize, Default)]
Expand Down Expand Up @@ -76,6 +78,10 @@ pub struct WorkerConfigCli {
/// Enable logging to file
#[arg(long)]
pub file_log: bool,
// The lifetime of the worker to alive (e.g., 7d, 1year)
#[arg(long)]
#[serde(skip_serializing_if = "::std::option::Option::is_none")]
pub lifetime: Option<String>,
}

impl Default for WorkerConfig {
Expand All @@ -91,6 +97,7 @@ impl Default for WorkerConfig {
tags: HashSet::new(),
log_path: None,
file_log: false,
lifetime: None,
}
}
}
Expand Down
3 changes: 3 additions & 0 deletions netmito/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ pub struct GroupQueryInfo {
pub struct RegisterWorkerReq {
pub tags: HashSet<String>,
pub groups: HashSet<String>,
#[serde(default)]
#[serde(with = "humantime_serde")]
pub lifetime: Option<std::time::Duration>,
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
40 changes: 39 additions & 1 deletion netmito/src/service/auth/token.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use jsonwebtoken::EncodingKey;
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;

use crate::error::DecodeTokenError;
use crate::error::{ApiError, DecodeTokenError};

#[derive(Debug, Serialize, Deserialize)]
pub struct TokenClaims<'a> {
Expand Down Expand Up @@ -41,6 +41,44 @@ where
encode_token(&claims, encoding_key)
}

pub fn generate_worker_token<T>(
username: T,
sign: i64,
lifetime: Option<std::time::Duration>,
) -> crate::error::Result<String>
where
T: AsRef<str>,
{
let token_ttl = match lifetime {
Some(ttl) => time::Duration::try_from(ttl).map_err(|_| {
ApiError::InvalidRequest(format!(
"Invalid lifetime {}",
humantime_serde::re::humantime::format_duration(ttl)
))
})?,
None => {
crate::config::SERVER_CONFIG
.get()
.ok_or(crate::error::Error::Custom(
"server config not found".to_string(),
))?
.token_expires_in
}
};
let claims = TokenClaims {
sub: Cow::from(username.as_ref()),
exp: OffsetDateTime::now_utc() + token_ttl,
sign,
};

let encoding_key = crate::config::ENCODING_KEY
.get()
.ok_or(crate::error::Error::Custom(
"encoding key not found".to_string(),
))?;
encode_token(&claims, encoding_key)
}

pub fn encode_token(
claims: &TokenClaims,
encoding_key: &EncodingKey,
Expand Down
1 change: 1 addition & 0 deletions netmito/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl MitoWorker {
let req = RegisterWorkerReq {
tags: config.tags.clone(),
groups: config.groups.clone(),
lifetime: config.lifetime,
};
let resp = http_client
.post(url.as_str())
Expand Down

0 comments on commit bb30eab

Please sign in to comment.