From 07a9402f5d1b2fb5dedc22751a59914ebcf41562 Mon Sep 17 00:00:00 2001 From: Taylor Otwell Date: Fri, 30 Dec 2016 10:07:57 -0600 Subject: [PATCH] Beginning re-factors of listener. --- .../Queue/Console/FailedTableCommand.php | 33 ++++-- .../Queue/Console/ListenCommand.php | 10 +- src/Illuminate/Queue/Listener.php | 107 ++++++++++-------- 3 files changed, 85 insertions(+), 65 deletions(-) diff --git a/src/Illuminate/Queue/Console/FailedTableCommand.php b/src/Illuminate/Queue/Console/FailedTableCommand.php index 9826498e3669..c549a3ebb3e9 100644 --- a/src/Illuminate/Queue/Console/FailedTableCommand.php +++ b/src/Illuminate/Queue/Console/FailedTableCommand.php @@ -59,16 +59,10 @@ public function fire() { $table = $this->laravel['config']['queue.failed.table']; - $tableClassName = Str::studly($table); - - $fullPath = $this->createBaseMigration($table); - - $stub = str_replace( - ['{{table}}', '{{tableClassName}}'], [$table, $tableClassName], $this->files->get(__DIR__.'/stubs/failed_jobs.stub') + $this->replaceMigration( + $this->createBaseMigration($table), $table, Str::studly($table) ); - $this->files->put($fullPath, $stub); - $this->info('Migration created successfully!'); $this->composer->dumpAutoloads(); @@ -82,10 +76,27 @@ public function fire() */ protected function createBaseMigration($table = 'failed_jobs') { - $name = 'create_'.$table.'_table'; + return $this->laravel['migration.creator']->create( + 'create_'.$table.'_table', $this->laravel->databasePath().'/migrations' + ); + } - $path = $this->laravel->databasePath().'/migrations'; + /** + * Replace the generated migration with the failed job table stub. + * + * @param string $path + * @param string $table + * @param string $tableClassName + * @return void + */ + protected function replaceMigration($path, $table, $tableClassName) + { + $stub = str_replace( + ['{{table}}', '{{tableClassName}}'], + [$table, $tableClassName], + $this->files->get(__DIR__.'/stubs/failed_jobs.stub') + ); - return $this->laravel['migration.creator']->create($name, $path); + $this->files->put($path, $stub); } } diff --git a/src/Illuminate/Queue/Console/ListenCommand.php b/src/Illuminate/Queue/Console/ListenCommand.php index 67de1d16e27e..66ceea4aea74 100755 --- a/src/Illuminate/Queue/Console/ListenCommand.php +++ b/src/Illuminate/Queue/Console/ListenCommand.php @@ -52,17 +52,17 @@ public function fire() { $this->setListenerOptions(); - $delay = $this->input->getOption('delay'); + $connection = $this->input->getArgument('connection'); // The memory limit is the amount of memory we will allow the script to occupy // before killing it and letting a process manager restart it for us, which // is to protect us against any memory leaks that will be in the scripts. $memory = $this->input->getOption('memory'); - $connection = $this->input->getArgument('connection'); - $timeout = $this->input->getOption('timeout'); + $delay = $this->input->getOption('delay'); + // We need to get the right queue for the connection which is set in the queue // configuration file for the application. We will pull it based on the set // connection being run for the queue operation currently being executed. @@ -85,7 +85,9 @@ protected function getQueue($connection) $connection = $this->laravel['config']['queue.default']; } - $queue = $this->laravel['config']->get("queue.connections.{$connection}.queue", 'default'); + $queue = $this->laravel['config']->get( + "queue.connections.{$connection}.queue", 'default' + ); return $this->input->getOption('queue') ?: $queue; } diff --git a/src/Illuminate/Queue/Listener.php b/src/Illuminate/Queue/Listener.php index 5ab42b05b076..a84cc6790a4c 100755 --- a/src/Illuminate/Queue/Listener.php +++ b/src/Illuminate/Queue/Listener.php @@ -98,27 +98,6 @@ public function listen($connection, $queue, $delay, $memory, $timeout = 60) } } - /** - * Run the given process. - * - * @param \Symfony\Component\Process\Process $process - * @param int $memory - * @return void - */ - public function runProcess(Process $process, $memory) - { - $process->run(function ($type, $line) { - $this->handleWorkerOutput($type, $line); - }); - - // Once we have run the job we'll go check if the memory limit has been - // exceeded for the script. If it has, we will kill this script so a - // process manager will restart this with a clean slate of memory. - if ($this->memoryExceeded($memory)) { - $this->stop(); - } - } - /** * Create a new Symfony process for the worker. * @@ -131,29 +110,77 @@ public function runProcess(Process $process, $memory) */ public function makeProcess($connection, $queue, $delay, $memory, $timeout) { - $string = $this->workerCommand; + $command = $this->workerCommand; // If the environment is set, we will append it to the command string so the // workers will run under the specified environment. Otherwise, they will // just run under the production environment which is not always right. if (isset($this->environment)) { - $string .= ' --env='.ProcessUtils::escapeArgument($this->environment); + $command = $this->addEnvironment($command); } // Next, we will just format out the worker commands with all of the various // options available for the command. This will produce the final command // line that we will pass into a Symfony process object for processing. - $command = sprintf( - $string, + $command = $this->formatCommand( + $command, $connection, $queue, $delay, $memory + ); + + return new Process( + $command, $this->commandPath, null, null, $timeout + ); + } + + /** + * Add the environment option to the given command. + * + * @param string $command + * @return string + */ + protected function addEnvironment($command) + { + return $command.' --env='.ProcessUtils::escapeArgument($this->environment); + } + + /** + * Format the given command with the listener options. + * + * @param string $command + * @param string $connection + * @param string $queue + * @param int $delay + * @param int $memory + * @return string + */ + protected function formatCommand($command, $connection, $queue, $delay, $memory) + { + return sprintf( + $command, ProcessUtils::escapeArgument($connection), ProcessUtils::escapeArgument($queue), - $delay, - $memory, - $this->sleep, - $this->maxTries + $delay, $memory, $this->sleep, $this->maxTries ); + } + + /** + * Run the given process. + * + * @param \Symfony\Component\Process\Process $process + * @param int $memory + * @return void + */ + public function runProcess(Process $process, $memory) + { + $process->run(function ($type, $line) { + $this->handleWorkerOutput($type, $line); + }); - return new Process($command, $this->commandPath, null, null, $timeout); + // Once we have run the job we'll go check if the memory limit has been exceeded + // for the script. If it has, we will kill this script so the process manager + // will restart this with a clean slate of memory automatically on exiting. + if ($this->memoryExceeded($memory)) { + $this->stop(); + } } /** @@ -202,16 +229,6 @@ public function setOutputHandler(Closure $outputHandler) $this->outputHandler = $outputHandler; } - /** - * Get the current listener environment. - * - * @return string - */ - public function getEnvironment() - { - return $this->environment; - } - /** * Set the current environment. * @@ -223,16 +240,6 @@ public function setEnvironment($environment) $this->environment = $environment; } - /** - * Get the amount of seconds to wait before polling the queue. - * - * @return int - */ - public function getSleep() - { - return $this->sleep; - } - /** * Set the amount of seconds to wait before polling the queue. *