Skip to content

Commit

Permalink
Improved handling of simultaneously running jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
andreas committed Apr 29, 2019
1 parent 2864fec commit f9af8c6
Show file tree
Hide file tree
Showing 5 changed files with 155 additions and 67 deletions.
75 changes: 23 additions & 52 deletions src/library/M4bTool/Command/MergeCommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use M4bTool\Executables\Mp4tags;
use M4bTool\Executables\Mp4v2Wrapper;
use M4bTool\Executables\Tasks\ConversionTask;
use M4bTool\Executables\Tasks\Pool;
use M4bTool\Filesystem\DirectoryLoader;
use M4bTool\Chapter\ChapterMarker;
use M4bTool\Parser\FfmetaDataParser;
Expand Down Expand Up @@ -621,8 +622,9 @@ private function convertInputFiles()

$ffmpeg = new Ffmpeg();
$fdkaac = new Fdkaac();
/** @var ConversionTask[] $conversionTasks */
$conversionTasks = [];

$jobs = $this->input->getOption(static::OPTION_JOBS) ? (int)$this->input->getOption(static::OPTION_JOBS) : 1;
$taskPool = new Pool($jobs);

foreach ($this->filesToConvert as $index => $file) {

Expand All @@ -636,7 +638,6 @@ private function convertInputFiles()
unlink($outputFile);
}


$options = new FileConverterOptions();
$options->source = $file;
$options->destination = $outputFile;
Expand All @@ -650,62 +651,35 @@ private function convertInputFiles()
$options->force = $this->optForce;
$options->profile = $this->input->getOption(static::OPTION_AUDIO_PROFILE);

$conversionTasks[] = new ConversionTask($ffmpeg, $fdkaac, $options);
$taskPool->submit(new ConversionTask($ffmpeg, $fdkaac, $options));
}

$jobs = $this->input->getOption(static::OPTION_JOBS) ? (int)$this->input->getOption(static::OPTION_JOBS) : 1;

// minimum 1 job, maximum count conversionTasks jobs
$jobs = max(min($jobs, count($conversionTasks)), 1);

$runningTaskCount = 0;
$conversionTaskQueue = $conversionTasks;
$runningTasks = [];
$start = microtime(true);
$increaseProgressBarSeconds = 5;
do {
$firstFailedTask = null;
if ($runningTaskCount > 0 && $firstFailedTask === null) {
foreach ($runningTasks as $task) {
if ($task->didFail()) {
$firstFailedTask = $task;
break;
}
}
}

// add new tasks, if no task did fail and jobs left
/** @var ConversionTask $task */
$task = null;
while ($firstFailedTask === null && $runningTaskCount < $jobs && $task = array_shift($conversionTaskQueue)) {
$task->run();
$runningTasks[] = $task;
$runningTaskCount++;
}

usleep(250000);

$runningTasks = array_filter($runningTasks, function (ConversionTask $task) {
return $task->isRunning();
});
$this->notice(sprintf("preparing conversion with %d simultaneous %s, which may seem unresponsive, be patient...", $jobs, $jobs === 1 ? "job" : "jobs"));

$taskPool->process(function ($runningTasks, $conversionQueue, $runtime) use ($increaseProgressBarSeconds, $jobs) {
$runningTaskCount = count($runningTasks);
$conversionQueueLength = count($conversionTaskQueue);

$time = microtime(true);
$progressBar = str_repeat("+", ceil(($time - $start) / $increaseProgressBarSeconds));
$this->output->write(sprintf("\r%d/%d remaining tasks running: %s", $runningTaskCount, ($conversionQueueLength + $runningTaskCount), $progressBar), false, OutputInterface::VERBOSITY_VERBOSE);

} while ($conversionQueueLength > 0 || $runningTaskCount > 0);
$conversionQueueLength = count($conversionQueue);
$remainingTaskCount = $conversionQueueLength + $runningTaskCount;
$progressBarLength = ceil($runtime / $increaseProgressBarSeconds);
$progressBar = str_repeat("+", $progressBarLength);
if ($remainingTaskCount === 0) {
$message = sprintf("\rfinished all tasks, preparing next step");
} else if ($runningTaskCount === 0) {
$message = sprintf("\r%d remaining tasks are beeing prepared: %s", $remainingTaskCount, $progressBar);
} else if ($runningTaskCount > 0) {
$message = sprintf("\r%d remaining converting tasks are running: %s", $remainingTaskCount, $progressBar);
} else {
$message = sprintf("\rpreparing conversion: %s", $progressBar);
}
$this->output->write($message, false, OutputInterface::VERBOSITY_VERBOSE);
});
$this->output->writeln("", OutputInterface::VERBOSITY_VERBOSE);
/** @var ConversionTask $firstFailedTask */
if ($firstFailedTask !== null) {
throw new Exception("a task has failed", null, $firstFailedTask->getLastException());
}


/** @var ConversionTask $task */
foreach ($conversionTasks as $index => $task) {
foreach ($taskPool->getTasks() as $index => $task) {
$pad = str_pad($index + 1, $padLen, "0", STR_PAD_LEFT);
$file = $task->getOptions()->source;
$outputFile = $task->getOptions()->destination;
Expand All @@ -721,10 +695,7 @@ private function convertInputFiles()
}

rename($outputFile, $finishedOutputFile);
$task->cleanUp();
}
// }

}

/**
Expand Down
29 changes: 29 additions & 0 deletions src/library/M4bTool/Executables/Tasks/AbstractTask.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
<?php


namespace M4bTool\Executables\Tasks;


use Throwable;

abstract class AbstractTask implements RunnableInterface
{
/** @var Throwable */
protected $lastException;

abstract public function run();

abstract public function isRunning();

abstract public function cleanUp();

public function didFail()
{
return $this->lastException instanceof Throwable;
}

public function getLastException()
{
return $this->lastException;
}
}
16 changes: 2 additions & 14 deletions src/library/M4bTool/Executables/Tasks/ConversionTask.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
use Throwable;


class ConversionTask implements Runnable
class ConversionTask extends AbstractTask
{
/**
* @var Ffmpeg
Expand All @@ -33,9 +33,7 @@ class ConversionTask implements Runnable
/** @var Process */
protected $process;

/** @var Throwable */
protected $lastException;

/** @var SplFileInfo[] */
protected $tmpFilesToCleanUp = [];

public function __construct(Ffmpeg $ffmpeg, Fdkaac $fdkaac, FileConverterOptions $options)
Expand Down Expand Up @@ -78,16 +76,6 @@ public function isRunning()
return false;
}

public function didFail()
{
return $this->lastException instanceof Throwable;
}

public function getLastException()
{
return $this->lastException;
}

public function getOptions()
{
return $this->options;
Expand Down
100 changes: 100 additions & 0 deletions src/library/M4bTool/Executables/Tasks/Pool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?php


namespace M4bTool\Executables\Tasks;


use Exception;

class Pool
{
const STATUS_SUCCESS = 0;
const STATUS_RUNNING = 1;
const STATUS_FAILED = 2;

protected $size;
/**
* @var AbstractTask[]
*/
protected $tasks = [];

public function __construct($size)
{
$this->size = $size;
}

public function submit(AbstractTask $task)
{
$this->tasks[] = $task;
}

public function getTasks()
{
return $this->tasks;
}

/**
* @param callable|null $progressCallback
* @throws Exception
*/
public function process(callable $progressCallback = null)
{
// minimum 1 job, maximum count conversionTasks jobs
$jobs = max(min($this->size, count($this->tasks)), 1);
$progressCallback = $progressCallback ?? function () {
};
$runningTaskCount = 0;
$conversionTaskQueue = $this->tasks;
$runningTasks = [];
$start = microtime(true);
// $increaseProgressBarSeconds = 5;
do {
$firstFailedTask = null;
if ($runningTaskCount > 0 && $firstFailedTask === null) {
foreach ($runningTasks as $task) {
if ($task->didFail()) {
$firstFailedTask = $task;
break;
}
}
}

// add new tasks, if no task did fail and jobs left
/** @var ConversionTask $task */
$task = null;


while ($firstFailedTask === null && $runningTaskCount < $jobs && $task = array_shift($conversionTaskQueue)) {
$task->run();
$runningTasks[] = $task;
$runningTaskCount++;
}

usleep(250000);

$runningTasks = array_filter($runningTasks, function (ConversionTask $task) {
return $task->isRunning();
});

$runningTaskCount = count($runningTasks);
$conversionQueueLength = count($conversionTaskQueue);
$time = microtime(true);
$progressCallback($conversionTaskQueue, $runningTasks, $time - $start);

// $progressBar = str_repeat("+", ceil(($time - $start) / $increaseProgressBarSeconds));
// $this->output->write(sprintf("\r%d/%d remaining tasks running: %s", $runningTaskCount, ($conversionQueueLength + $runningTaskCount), $progressBar), false, OutputInterface::VERBOSITY_VERBOSE);

} while ($conversionQueueLength > 0 || $runningTaskCount > 0);

foreach ($this->tasks as $task) {
$task->cleanUp();
}

if ($firstFailedTask !== null) {
throw new Exception("a task has failed", null, $firstFailedTask->getLastException());
}

}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace M4bTool\Executables\Tasks;


interface Runnable
interface RunnableInterface
{
public function run();
}

0 comments on commit f9af8c6

Please sign in to comment.