Skip to content

Commit

Permalink
feat: proper timeout handling
Browse files Browse the repository at this point in the history
  • Loading branch information
timmylindh committed May 2, 2024
1 parent e3460af commit bfbe712
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 26 deletions.
9 changes: 9 additions & 0 deletions src/Exceptions/DidTimeoutAndFailException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
<?php

namespace Timmylindh\LaravelBeanstalkWorker\Exceptions;

use Exception;

class DidTimeoutAndFailException extends Exception
{
}
9 changes: 2 additions & 7 deletions src/Http/Controllers/WorkerController.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Illuminate\Queue\WorkerOptions;
use Illuminate\Support\Facades\Artisan;
use Symfony\Component\Console\Output\BufferedOutput;
use Timmylindh\LaravelBeanstalkWorker\Exceptions\DidTimeoutAndFailException;
use Timmylindh\LaravelBeanstalkWorker\SQSJob;
use Timmylindh\LaravelBeanstalkWorker\SQSQueueModifier;
use Timmylindh\LaravelBeanstalkWorker\SQSWorker;
Expand Down Expand Up @@ -40,13 +41,6 @@ public function queue(

$job = new SQSJob($container, $queueModifier, $queue, $jobData);

if ($job->hasTimedoutAndShouldFail()) {
return response()->json([
'status' => 'timeout-should-fail',
'id' => $job->getJobId(),
]);
}

try {
$worker->process(
$queue,
Expand All @@ -57,6 +51,7 @@ public function queue(
timeout: config('worker.timeout'),
),
);
} catch (DidTimeoutAndFailException $e) {
} catch (\Throwable $e) {
report($e);
}
Expand Down
13 changes: 9 additions & 4 deletions src/LaravelBeanstalkWorkerServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@

use Illuminate\Contracts\Debug\ExceptionHandler;
use Illuminate\Support\ServiceProvider;
use Illuminate\Contracts\Queue\Factory as QueueFactory;
use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\SqsQueue;
use Illuminate\Support\Facades\Event;
use Timmylindh\LaravelBeanstalkWorker\Listeners\LogFailedJob;

class LaravelBeanstalkWorkerServiceProvider extends ServiceProvider
{
Expand All @@ -28,7 +30,7 @@ public function register()

$this->app->singleton(
SqsQueue::class,
fn($app) => $app->make(QueueFactory::class)->connection('sqs'),
fn($app) => $app['queue']->connection('sqs'),
);
}

Expand All @@ -41,8 +43,11 @@ public function boot(): void
'laravel-beanstalk-worker-config',
);

if (config('worker.is_worker')) {
$this->loadRoutesFrom(__DIR__ . '/../routes/routes.php');
if (!config('worker.is_worker')) {
return;
}

$this->loadRoutesFrom(__DIR__ . '/../routes/routes.php');
Event::listen(JobFailed::class, LogFailedJob::class);
}
}
18 changes: 18 additions & 0 deletions src/Listeners/LogFailedJob.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Timmylindh\LaravelBeanstalkWorker\Listeners;

use Illuminate\Queue\Events\JobFailed;

class LogFailedJob
{
public function handle(JobFailed $event)
{
app('queue.failer')->log(
$event->connectionName,
$event->job->getQueue(),
$event->job->getRawBody(),
$event->exception,
);
}
}
2 changes: 2 additions & 0 deletions src/SQSJob.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public function __construct(
$this->queue = $queue;
$this->queueModifier = $queueModifier;
$this->container = $container;
$this->connectionName = 'sqs';

$this->payload = parent::payload();
}

Expand Down
22 changes: 7 additions & 15 deletions src/SQSWorker.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,26 @@
use Illuminate\Queue\Worker;
use Illuminate\Queue\WorkerOptions;
use Throwable;
use Timmylindh\LaravelBeanstalkWorker\Exceptions\DidTimeoutAndFailException;

class SQSWorker extends Worker
{
/**
* Process the given job from the queue.
*
* @param string $connectionName
* @param \Illuminate\Contracts\Queue\Job $job
* @param SQSJob $job
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
*
* @throws \Throwable
*/
public function process($connectionName, $job, WorkerOptions $options)
{
if ($job->hasTimedoutAndShouldFail()) {
throw new DidTimeoutAndFailException();
}

$this->startTimeoutHandler($job, $options);
parent::process($connectionName, $job, $options);
}
Expand Down Expand Up @@ -51,23 +56,10 @@ protected function startTimeoutHandler($job, WorkerOptions $options)
return;
}

$this->markJobAsFailedIfWillExceedMaxAttempts(
$job->getConnectionName(),
$job,
(int) $options->maxTries,
$e = $this->timeoutExceededException($job),
);

$this->markJobAsFailedIfWillExceedMaxExceptions(
$job->getConnectionName(),
$job,
$e,
);

$this->markJobAsFailedIfItShouldFailOnTimeout(
$job->getConnectionName(),
$job,
$e,
$this->timeoutExceededException($job),
);

$this->events->dispatch(
Expand Down
10 changes: 10 additions & 0 deletions tests/LaravelBeanstalkWorkerServiceProviderTest.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
<?php

use Illuminate\Queue\Events\JobFailed;
use Illuminate\Queue\SqsQueue;
use Timmylindh\LaravelBeanstalkWorker\Listeners\LogFailedJob;
use Timmylindh\LaravelBeanstalkWorker\SQSWorker;

it('registers the SQSWorker', function () {
Expand All @@ -9,3 +12,10 @@
it('registers the SqsQueue', function () {
expect(app(SqsQueue::class))->toBeInstanceOf(SqsQueue::class);
});

it('registers the JobFailed event listener', function () {
$this->assertListenerIsAttachedToEvent(
LogFailedJob::class,
JobFailed::class,
);
});
32 changes: 32 additions & 0 deletions tests/TestCase.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

namespace Timmylindh\LaravelBeanstalkWorker\Tests;

use Illuminate\Events\Dispatcher;
use Orchestra\Testbench\TestCase as Orchestra;
use ReflectionFunction;
use Timmylindh\LaravelBeanstalkWorker\LaravelBeanstalkWorkerServiceProvider;

class TestCase extends Orchestra
Expand All @@ -27,4 +29,34 @@ public function getEnvironmentSetUp($app)
$migration->up();
*/
}

public function assertListenerIsAttachedToEvent($listener, $event)
{
$dispatcher = app(Dispatcher::class);

foreach (
$dispatcher->getListeners(
is_object($event) ? get_class($event) : $event,
)
as $listenerClosure
) {
$reflection = new ReflectionFunction($listenerClosure);
$listenerClass = $reflection->getStaticVariables()['listener'];

if ($listenerClass === $listener) {
$this->assertTrue(true);

return;
}
}

$this->assertTrue(
false,
sprintf(
'Event %s does not have the %s listener attached to it',
$event,
$listener,
),
);
}
}

0 comments on commit bfbe712

Please sign in to comment.