Skip to content

Commit

Permalink
[10.x] Dispatch events based on a DB transaction result (#48705)
Browse files Browse the repository at this point in the history
* wip

* Refactor

* Add EventFake support

* remove strict types

* Make styleCI happy

* Fix test

* Add missing test for EventFake

* Add test to handle nested transactions

* fix typo

* Make styleci happy

* formatting, inject manager resolver

* formatting

* formatting

* formatting

* formatting

* more thorough solution

* Add additional test for nested transactions

* Add additional nested transaction test

---------

Co-authored-by: Taylor Otwell <taylor@laravel.com>
  • Loading branch information
mateusjatenee and taylorotwell authored Oct 30, 2023
1 parent c55667a commit 0525a88
Show file tree
Hide file tree
Showing 12 changed files with 383 additions and 9 deletions.
8 changes: 8 additions & 0 deletions src/Illuminate/Contracts/Events/ShouldDispatchAfterCommit.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Illuminate\Contracts\Events;

interface ShouldDispatchAfterCommit
{
//
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Illuminate\Contracts\Events;

interface ShouldHandleEventsAfterCommit
{
//
}
8 changes: 8 additions & 0 deletions src/Illuminate/Contracts/Queue/ShouldQueueAfterCommit.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
<?php

namespace Illuminate\Contracts\Queue;

interface ShouldQueueAfterCommit extends ShouldQueue
{
//
}
18 changes: 18 additions & 0 deletions src/Illuminate/Database/Eloquent/BroadcastsEventsAfterCommit.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Illuminate\Database\Eloquent;

trait BroadcastsEventsAfterCommit
{
use BroadcastsEvents;

/**
* Determine if the model event broadcast queued job should be dispatched after all transactions are committed.
*
* @return bool
*/
public function broadcastAfterCommit()
{
return true;
}
}
79 changes: 73 additions & 6 deletions src/Illuminate/Events/Dispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@
use Illuminate\Contracts\Broadcasting\ShouldBroadcast;
use Illuminate\Contracts\Container\Container as ContainerContract;
use Illuminate\Contracts\Events\Dispatcher as DispatcherContract;
use Illuminate\Contracts\Events\ShouldDispatchAfterCommit;
use Illuminate\Contracts\Events\ShouldHandleEventsAfterCommit;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Illuminate\Support\Traits\Macroable;
Expand Down Expand Up @@ -56,6 +59,13 @@ class Dispatcher implements DispatcherContract
*/
protected $queueResolver;

/**
* The database transaction manager resolver instance.
*
* @var callable
*/
protected $transactionManagerResolver;

/**
* Create a new event dispatcher instance.
*
Expand Down Expand Up @@ -235,10 +245,37 @@ public function dispatch($event, $payload = [], $halt = false)
// When the given "event" is actually an object we will assume it is an event
// object and use the class as the event name and this event itself as the
// payload to the handler, which makes object based events quite simple.
[$event, $payload] = $this->parseEventAndPayload(
$event, $payload
);
[$isEventObject, $event, $payload] = [
is_object($event),
...$this->parseEventAndPayload($event, $payload)
];

// If the event is not intended to be dispatched unless the current database
// transaction is successful, we'll register a callback which will handle
// dispatching this event on the next successful DB transaction commit.
if ($isEventObject &&
$payload[0] instanceof ShouldDispatchAfterCommit &&
! is_null($transactions = $this->resolveTransactionManager())) {
$transactions->addCallback(
fn () => $this->invokeListeners($event, $payload, $halt)
);

return null;
}

return $this->invokeListeners($event, $payload, $halt);
}

/**
* Broadcast an event and call its listeners.
*
* @param string|object $event
* @param mixed $payload
* @param bool $halt
* @return array|null
*/
protected function invokeListeners($event, $payload, $halt = false)
{
if ($this->shouldBroadcast($payload)) {
$this->broadcastEvent($payload[0]);
}
Expand Down Expand Up @@ -525,7 +562,9 @@ protected function createQueuedHandlerCallable($class, $method)
*/
protected function handlerShouldBeDispatchedAfterDatabaseTransactions($listener)
{
return ($listener->afterCommit ?? null) && $this->container->bound('db.transactions');
return (($listener->afterCommit ?? null) ||
$listener instanceof ShouldHandleEventsAfterCommit) &&
$this->resolveTransactionManager();
}

/**
Expand All @@ -540,7 +579,7 @@ protected function createCallbackForListenerRunningAfterCommits($listener, $meth
return function () use ($method, $listener) {
$payload = func_get_args();

$this->container->make('db.transactions')->addCallback(
$this->resolveTransactionManager()->addCallback(
function () use ($listener, $method, $payload) {
$listener->$method(...$payload);
}
Expand Down Expand Up @@ -624,7 +663,12 @@ protected function propagateListenerOptions($listener, $job)
return tap($job, function ($job) use ($listener) {
$data = array_values($job->data);

$job->afterCommit = property_exists($listener, 'afterCommit') ? $listener->afterCommit : null;
if ($listener instanceof ShouldQueueAfterCommit) {
$job->afterCommit = true;
} else {
$job->afterCommit = property_exists($listener, 'afterCommit') ? $listener->afterCommit : null;
}

$job->backoff = method_exists($listener, 'backoff') ? $listener->backoff(...$data) : ($listener->backoff ?? null);
$job->maxExceptions = $listener->maxExceptions ?? null;
$job->retryUntil = method_exists($listener, 'retryUntil') ? $listener->retryUntil(...$data) : null;
Expand Down Expand Up @@ -697,6 +741,29 @@ public function setQueueResolver(callable $resolver)
return $this;
}

/**
* Get the database transaction manager implementation from the resolver.
*
* @return \Illuminate\Database\DatabaseTransactionsManager|null
*/
protected function resolveTransactionManager()
{
return call_user_func($this->transactionManagerResolver);
}

/**
* Set the database transaction manager resolver implementation.
*
* @param callable $resolver
* @return $this
*/
public function setTransactionManagerResolver(callable $resolver)
{
$this->transactionManagerResolver = $resolver;

return $this;
}

/**
* Gets the raw, unprepared listeners.
*
Expand Down
4 changes: 4 additions & 0 deletions src/Illuminate/Events/EventServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ public function register()
$this->app->singleton('events', function ($app) {
return (new Dispatcher($app))->setQueueResolver(function () use ($app) {
return $app->make(QueueFactoryContract::class);
})->setTransactionManagerResolver(function () use ($app) {
return $app->bound('db.transactions')
? $app->make('db.transactions')
: null;
});
});
}
Expand Down
8 changes: 7 additions & 1 deletion src/Illuminate/Mail/SendQueuedMailable.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use Illuminate\Contracts\Mail\Factory as MailFactory;
use Illuminate\Contracts\Mail\Mailable as MailableContract;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
use Illuminate\Queue\InteractsWithQueue;

class SendQueuedMailable
Expand Down Expand Up @@ -57,7 +58,12 @@ public function __construct(MailableContract $mailable)
{
$this->mailable = $mailable;

$this->afterCommit = property_exists($mailable, 'afterCommit') ? $mailable->afterCommit : null;
if ($mailable instanceof ShouldQueueAfterCommit) {
$this->afterCommit = true;
} else {
$this->afterCommit = property_exists($mailable, 'afterCommit') ? $mailable->afterCommit : null;
}

$this->connection = property_exists($mailable, 'connection') ? $mailable->connection : null;
$this->maxExceptions = property_exists($mailable, 'maxExceptions') ? $mailable->maxExceptions : null;
$this->queue = property_exists($mailable, 'queue') ? $mailable->queue : null;
Expand Down
9 changes: 8 additions & 1 deletion src/Illuminate/Notifications/SendQueuedNotifications.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
use Illuminate\Database\Eloquent\Collection as EloquentCollection;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Queue\InteractsWithQueue;
Expand Down Expand Up @@ -80,7 +81,13 @@ public function __construct($notifiables, $notification, array $channels = null)
$this->tries = property_exists($notification, 'tries') ? $notification->tries : null;
$this->timeout = property_exists($notification, 'timeout') ? $notification->timeout : null;
$this->maxExceptions = property_exists($notification, 'maxExceptions') ? $notification->maxExceptions : null;
$this->afterCommit = property_exists($notification, 'afterCommit') ? $notification->afterCommit : null;

if ($notification instanceof ShouldQueueAfterCommit) {
$this->afterCommit = true;
} else {
$this->afterCommit = property_exists($notification, 'afterCommit') ? $notification->afterCommit : null;
}

$this->shouldBeEncrypted = $notification instanceof ShouldBeEncrypted;
}

Expand Down
5 changes: 5 additions & 0 deletions src/Illuminate/Queue/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Illuminate\Container\Container;
use Illuminate\Contracts\Encryption\Encrypter;
use Illuminate\Contracts\Queue\ShouldBeEncrypted;
use Illuminate\Contracts\Queue\ShouldQueueAfterCommit;
use Illuminate\Queue\Events\JobQueued;
use Illuminate\Support\Arr;
use Illuminate\Support\InteractsWithTime;
Expand Down Expand Up @@ -325,6 +326,10 @@ function () use ($payload, $queue, $delay, $callback, $job) {
*/
protected function shouldDispatchAfterCommit($job)
{
if (is_object($job) && $job instanceof ShouldQueueAfterCommit) {
return true;
}

if (! $job instanceof Closure && is_object($job) && isset($job->afterCommit)) {
return $job->afterCommit;
}
Expand Down
22 changes: 21 additions & 1 deletion src/Illuminate/Support/Testing/Fakes/EventFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
namespace Illuminate\Support\Testing\Fakes;

use Closure;
use Illuminate\Container\Container;
use Illuminate\Contracts\Events\Dispatcher;
use Illuminate\Contracts\Events\ShouldDispatchAfterCommit;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use Illuminate\Support\Traits\ForwardsCalls;
Expand Down Expand Up @@ -297,7 +299,7 @@ public function dispatch($event, $payload = [], $halt = false)
$name = is_object($event) ? get_class($event) : (string) $event;

if ($this->shouldFakeEvent($name, $payload)) {
$this->events[$name][] = func_get_args();
$this->fakeEvent($event, $name, func_get_args());
} else {
return $this->dispatcher->dispatch($event, $payload, $halt);
}
Expand Down Expand Up @@ -329,6 +331,24 @@ protected function shouldFakeEvent($eventName, $payload)
->isNotEmpty();
}

/**
* Push the event onto the fake events array immediately or after the next database transaction.
*
* @param string|object $event
* @param string $name
* @param array $arguments
* @return void
*/
protected function fakeEvent($event, $name, $arguments)
{
if ($event instanceof ShouldDispatchAfterCommit && Container::getInstance()->bound('db.transactions')) {
return Container::getInstance()->make('db.transactions')
->addCallback(fn () => $this->events[$name][] = $arguments);
}

$this->events[$name][] = $arguments;
}

/**
* Determine whether an event should be dispatched or not.
*
Expand Down
60 changes: 60 additions & 0 deletions tests/Integration/Events/EventFakeTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@
namespace Illuminate\Tests\Integration\Events;

use Closure;
use Exception;
use Illuminate\Contracts\Events\ShouldDispatchAfterCommit;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Database\Schema\Blueprint;
use Illuminate\Support\Arr;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\Event;
use Illuminate\Support\Facades\Schema;
use Orchestra\Testbench\TestCase;
use PHPUnit\Framework\ExpectationFailedException;

class EventFakeTest extends TestCase
{
Expand Down Expand Up @@ -182,6 +186,57 @@ public function testMissingMethodsAreForwarded()

$this->assertEquals('bar', Event::fake()->foo());
}

public function testShouldDispatchAfterCommitEventsAreNotDispatchedIfTransactionFails()
{
Event::fake();

try {
DB::transaction(function () {
Event::dispatch(new ShouldDispatchAfterCommitEvent());

throw new Exception('foo');
});
} catch (Exception $e) {
}

Event::assertNotDispatched(ShouldDispatchAfterCommitEvent::class);
}

public function testShouldDispatchAfterCommitEventsAreDispatchedIfTransactionSucceeds()
{
Event::fake();

DB::transaction(function () {
Event::dispatch(new ShouldDispatchAfterCommitEvent());
});

Event::assertDispatched(ShouldDispatchAfterCommitEvent::class);
}

public function testShouldDispatchAfterCommitEventsAreDispatchedIfThereIsNoTransaction()
{
Event::fake();

Event::dispatch(new ShouldDispatchAfterCommitEvent());
Event::assertDispatched(ShouldDispatchAfterCommitEvent::class);
}

public function testAssertNothingDispatchedShouldDispatchAfterCommit()
{
Event::fake();
Event::assertNothingDispatched();

Event::dispatch(new ShouldDispatchAfterCommitEvent);
Event::dispatch(new ShouldDispatchAfterCommitEvent);

try {
Event::assertNothingDispatched();
$this->fail();
} catch (ExpectationFailedException $e) {
$this->assertStringContainsString('2 unexpected events were dispatched.', $e->getMessage());
}
}
}

class Post extends Model
Expand Down Expand Up @@ -248,3 +303,8 @@ public function __invoke($event)
//
}
}

class ShouldDispatchAfterCommitEvent implements ShouldDispatchAfterCommit
{
//
}
Loading

0 comments on commit 0525a88

Please sign in to comment.