Skip to content
This repository has been archived by the owner on Aug 13, 2024. It is now read-only.

Commit

Permalink
Improve output to STDERR / STDOUT (#51)
Browse files Browse the repository at this point in the history
A lot of refactoring took place here to draw the output from a store of task results
rather than an ongoing stream

WIP - large refactor

use chrono crate instead of deprecated time crate
pull out print logic from factotum, put in cli making it testable
print task stderr to stderr
ensure executor runs as normal when no config is passed (treats everything as a raw string)
save the time a task started, include in output

WIP Misc output fixes / new duration format

WIP cleanup cli output some more

WIP fix order of events display glitch

WIP bold stdout
  • Loading branch information
ninjabear committed Jun 12, 2016
1 parent ecdbbf0 commit 3ee78cb
Show file tree
Hide file tree
Showing 16 changed files with 876 additions and 398 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ log4rs = "0.3.3"
daggy = "0.4.0"
rustc-serialize = "0.3.18"
valico = "0.8.2"
time = "*"
chrono = "0.2"
colored = "1.2"
mustache = "*"
215 changes: 215 additions & 0 deletions src/factotum/executor/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright (c) 2016 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0, and
* you may not use this file except in compliance with the Apache License
* Version 2.0. You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the Apache License Version 2.0 is distributed on an "AS
* IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the Apache License Version 2.0 for the specific language
* governing permissions and limitations there under.
*/

use factotum::factfile::*;
use std::process::Command;
use std::time::{Duration, Instant};
use std::thread;
use std::sync::mpsc;
use std::collections::HashMap;
use chrono::DateTime;
use chrono::UTC;

enum TaskResult {
Ok(i32, Duration),
TerminateJobPlease(i32, Duration),
Error(Option<i32>, String)
}

pub struct RunResult {
pub run_started: DateTime<UTC>,
pub duration: Duration,
pub requests_job_termination: bool,
pub task_execution_error: Option<String>,
pub stdout: Option<String>,
pub stderr: Option<String>,
pub return_code: i32
}

pub struct TaskExecutionResult {
pub name: String,
pub attempted: bool,
pub run_details: Option<RunResult>
}

pub enum ExecutionResult {
AllTasksComplete(Vec<TaskExecutionResult>),
EarlyFinishOk(Vec<TaskExecutionResult>),
AbnormalTermination(Vec<TaskExecutionResult>)
}

#[inline]
fn drain_values(mut map:HashMap<String, TaskExecutionResult>, tasks_in_order:&Vec<Vec<&Task>>) -> Vec<TaskExecutionResult> {
let mut task_seq:Vec<TaskExecutionResult> = vec![];
for task_level in tasks_in_order.iter() {
for task in task_level.iter() {
match map.remove(&task.name) {
Some(task_result) => task_seq.push(task_result),
_ => warn!("A task ({}) does not have an execution result? Skipping", task.name)
}
}
}
task_seq
}

pub fn execute_factfile(factfile:&Factfile) -> ExecutionResult {
let tasks = factfile.get_tasks_in_order();

let mut task_results:HashMap<String, TaskExecutionResult> = HashMap::new();
for task_level in tasks.iter() { // TODO replace me with helper iterator
for task in task_level.iter() {
let new_task_result = TaskExecutionResult { name: task.name.clone(), attempted: false, run_details:None };
task_results.insert(new_task_result.name.clone(), new_task_result );
}
}

for task_level in tasks.iter() {
// everything in a task "level" gets run together
// this isn't quite right in a dag sense, but I think practically it'll be ok (if not we'll come back to it)
let (tx, rx) = mpsc::channel::<(usize, TaskResult, Option<String>, Option<String>, DateTime<UTC>)>();

for (idx,task) in task_level.iter().enumerate() {
info!("Running task '{}'!", task.name);
{
let tx = tx.clone();
let args = format_args(&task.command, &task.arguments);
let executor = task.executor.to_string();
let continue_job_codes = task.on_result.continue_job.clone();
let terminate_job_codes = task.on_result.terminate_job.clone();
let task_name = task.name.to_string();

thread::spawn(move || {
//println!("Executing task '{}'!", &task_name.cyan());
let start_time = UTC::now();
let (task_result, stdout, stderr) = execute_task(task_name, executor, args, terminate_job_codes, continue_job_codes);
tx.send((idx, task_result, stdout, stderr, start_time)).unwrap();
});
}
}

let mut terminate_job_please = false;
let mut task_failed = false;

for _ in 0..task_level.len() {
match rx.recv().unwrap() {
(idx, TaskResult::Ok(code, duration), stdout, stderr, start_time) => {
info!("'{}' returned {} in {:?}", task_level[idx].name, code, duration); // todo; sensible Display implementation of Duration
//println!("Task '{}' after {} returned {}", &task_level[idx].name.cyan(), duration, code);
let task_result:&mut TaskExecutionResult = task_results.get_mut(&task_level[idx].name).unwrap();
task_result.attempted = true;
task_result.run_details = Some(RunResult { run_started: start_time,
duration: duration,
requests_job_termination: false,
task_execution_error: None,
stdout: stdout,
stderr: stderr,
return_code: code });
},
(idx, TaskResult::Error(code, msg), stdout, stderr, start_time) => {
warn!("task '{}' failed to execute!\n{}", task_level[idx].name, msg);
//println!("{}", &msg.red());
let task_result:&mut TaskExecutionResult = task_results.get_mut(&task_level[idx].name).unwrap();
task_result.attempted = true;

if let Some(return_code) = code {
task_result.run_details = Some(RunResult {
run_started: start_time,
duration: Duration::from_secs(0),
requests_job_termination: false,
task_execution_error: Some(msg),
stdout: stdout,
stderr: stderr,
return_code: return_code });
}
task_failed = true;
},
(idx, TaskResult::TerminateJobPlease(code, duration), stdout, stderr, start_time) => {
warn!("job will stop as task '{}' called for termination (no-op) with code {}", task_level[idx].name, code);
//println!("Job will now stop as task '{}' ended with {}", &task_level[idx].name.cyan(), code);

let task_result:&mut TaskExecutionResult = task_results.get_mut(&task_level[idx].name).unwrap();
task_result.attempted = true;
task_result.run_details = Some(RunResult {
run_started: start_time,
duration: duration,
requests_job_termination: true,
task_execution_error: None,
stdout: stdout,
stderr: stderr,
return_code: code });

terminate_job_please = true;
}
}
}

match (terminate_job_please, task_failed) {
(_, true) => { return ExecutionResult::AbnormalTermination(drain_values(task_results, &tasks)); },
(true, false) => { return ExecutionResult::EarlyFinishOk(drain_values(task_results, &tasks)); },
_ => {}
}
}

ExecutionResult::AllTasksComplete(drain_values(task_results, &tasks))
}

fn execute_task(task_name:String, executor:String, args:String, terminate_job_codes:Vec<i32>, continue_job_codes:Vec<i32>) -> (TaskResult, Option<String>, Option<String>) {
if executor!="shell" {
return (TaskResult::Error(None, "Only shell executions are supported currently!".to_string()), None, None)
} else {
let run_start = Instant::now();
info!("Executing sh -c {:?}", args);
match Command::new("sh").arg("-c").arg(args).output() {
Ok(r) => {
let run_duration = run_start.elapsed();
let return_code = r.status.code().unwrap_or(1); // 1 will be returned if the process was killed by a signal

let task_stdout: String = String::from_utf8_lossy(&r.stdout).trim_right().into();
let task_stderr: String = String::from_utf8_lossy(&r.stderr).trim_right().into();

info!("task '{}' stdout:\n'{}'", task_name, task_stdout);
info!("task '{}' stderr:\n'{}'", task_name, task_stderr);

let task_stdout_opt = if task_stdout.is_empty() { None } else { Some(task_stdout) };
let task_stderr_opt = if task_stderr.is_empty() { None } else { Some(task_stderr) };

if terminate_job_codes.contains(&return_code) {
(TaskResult::TerminateJobPlease(return_code, run_duration), task_stdout_opt, task_stderr_opt)
} else if continue_job_codes.contains(&return_code) {
(TaskResult::Ok(return_code, run_duration), task_stdout_opt, task_stderr_opt)
} else {
let expected_codes = continue_job_codes.iter()
.map(|code| code.to_string())
.collect::<Vec<String>>()
.join(",");
(TaskResult::Error(Some(return_code), format!("the task exited with a value not specified in continue_job - {} (task expects one of the following return codes to continue [{}])", return_code, expected_codes)),
task_stdout_opt,
task_stderr_opt)
}

},
Err(message) => (TaskResult::Error(None, format!("Error executing process - {}", message)), None, None)
}
}
}

fn format_args(command:&str, args:&Vec<String>) -> String {
let arg_str = args.iter()
.map(|s| format!("\"{}\"", s))
.collect::<Vec<String>>()
.join(" ");
format!("{} {}", command, arg_str)
}

6 changes: 4 additions & 2 deletions src/factotum/factfile/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@

#[cfg(test)]
mod tests;

use daggy::*;
use factotum::sequencer;

pub struct Factfile {
pub name:String,
Expand Down Expand Up @@ -53,12 +55,12 @@ impl Factfile {

pub fn get_tasks_in_order<'a>(&'a self) -> Vec<Vec<&'a Task>> {
let mut tree:Vec<Vec<&Task>> = vec![];
super::get_tasks_in_order(&self.dag, &self.dag.children(self.root).iter(&self.dag).map(|(_, node_idx)| node_idx).collect(), &mut tree);
sequencer::get_tasks_in_order(&self.dag, &self.dag.children(self.root).iter(&self.dag).map(|(_, node_idx)| node_idx).collect(), &mut tree);
tree
}

fn find_task_by_name(&self, name:&str) -> Option<(NodeIndex, &Task)> {
super::find_task_recursive(&self.dag, name, self.root)
sequencer::find_task_recursive(&self.dag, name, self.root)
}

pub fn add_task_obj(&mut self, task:&Task) {
Expand Down
53 changes: 4 additions & 49 deletions src/factotum/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,56 +14,11 @@
*/

pub mod factfile;
pub mod fileparser;
pub mod runner;
pub mod parser;
pub mod executor;
pub mod sequencer;

#[cfg(test)]
mod tests;

use factotum::factfile::Task;
use daggy::*;

fn find_task_recursive<'a>(dag: &'a Dag<Task, ()>, name:&str, start:NodeIndex) -> Option<(NodeIndex, &'a Task)> {
if dag.children(start).iter(&dag).count() != 0 {
if let Some((_, node)) = dag.children(start).find(&dag, |g, _, n| g[n].name == name) {
return Some((node, &dag[node]))
} else {
for (_, child_node) in dag.children(start).iter(&dag) {
if let Some(v) = find_task_recursive(dag, name, child_node) {
return Some(v);
}
}
None
}
} else {
None
}
}

fn get_tasks_in_order<'a>(dag: &'a Dag<Task, ()>, start:&Vec<NodeIndex>, tree:&mut Vec<Vec<&'a Task>>) {
let mut row:Vec<&Task> = vec![];

for idx in start {
for row in tree.iter_mut() {
let posn = row.iter().position(|s| s.name==dag[*idx].name);
if let Some(remove_idx) = posn {
row.remove(remove_idx);
}
}
let no_dups = !row.iter().any(|s| s.name==dag[*idx].name);
if no_dups {
row.push(&dag[*idx]);
}
}
tree.push(row);

let mut children:Vec<NodeIndex> = vec![];
for parent in start.iter() {
for (_, node_index) in dag.children(*parent).iter(&dag) {
children.push(node_index);
}
}

if children.len() != 0 {
get_tasks_in_order(dag, &children, tree);
}
}
Loading

0 comments on commit 3ee78cb

Please sign in to comment.