Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Unsubscribe from transport on closing connection #12

Merged
merged 1 commit into from
Apr 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions src/Hub/Controller/SubscribeController.php
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,14 @@ function () use ($lastEventId, $stream, $subscribedTopics, $allowedTopics) {

async(
function () use ($stream, $subscribedTopics, $allowedTopics) {
$this->hub->subscribe(function (Update $update) use ($stream, $subscribedTopics, $allowedTopics) {
$this->sendUpdate($update, $stream, $subscribedTopics, $allowedTopics);
});
$callback = fn(Update $update) => $this->sendUpdate(
$update,
$stream,
$subscribedTopics,
$allowedTopics
);
$this->hub->subscribe($callback);
$stream->on('close', fn() => $this->hub->unsubscribe($callback));
}
)();

Expand Down
5 changes: 5 additions & 0 deletions src/Hub/Hub.php
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public function subscribe(callable $callback): void
$this->transport->subscribe($callback);
}

public function unsubscribe(callable $callback): void
{
$this->transport->unsubscribe($callback);
}

public function reconciliate(string $lastEventID): Generator
{
return $this->transport->reconciliate($lastEventID);
Expand Down
5 changes: 5 additions & 0 deletions src/Hub/Transport/PHP/PHPTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public function subscribe(callable $callback): void
$this->eventEmitter->on('mercureUpdate', $callback);
}

public function unsubscribe(callable $callback): void
{
$this->eventEmitter->removeListener('mercureUpdate', $callback);
}

public function reconciliate(string $lastEventID): Generator
{
$yield = self::EARLIEST === $lastEventID;
Expand Down
16 changes: 13 additions & 3 deletions src/Hub/Transport/Redis/RedisTransport.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
namespace Freddie\Hub\Transport\Redis;

use Clue\React\Redis\Client;
use Evenement\EventEmitter;
use Evenement\EventEmitterInterface;
use Freddie\Hub\Transport\TransportInterface;
use Freddie\Message\Update;
use Generator;
Expand All @@ -24,6 +26,7 @@ public function __construct(
public readonly Client $subscriber,
public readonly Client $redis,
private RedisSerializer $serializer = new RedisSerializer(),
private EventEmitterInterface $eventEmitter = new EventEmitter(),
private int $size = 0,
private float $trimInterval = 0.0,
) {
Expand All @@ -32,9 +35,12 @@ public function __construct(
public function subscribe(callable $callback): void
{
$this->init();
$this->subscriber->on('message', function (string $channel, string $payload) use ($callback) {
$callback($this->serializer->deserialize($payload));
});
$this->eventEmitter->on('mercureUpdate', $callback);
}

public function unsubscribe(callable $callback): void
{
$this->eventEmitter->removeListener('mercureUpdate', $callback);
}

public function publish(Update $update): PromiseInterface
Expand Down Expand Up @@ -84,6 +90,10 @@ private function init(): void
}

$this->subscriber->subscribe($this->channel); // @phpstan-ignore-line
$this->subscriber->on('message', function (string $channel, string $payload) {
$this->eventEmitter->emit('mercureUpdate', [$this->serializer->deserialize($payload)]);
});

if ($this->trimInterval > 0) {
Loop::addPeriodicTimer(
$this->trimInterval,
Expand Down
2 changes: 2 additions & 0 deletions src/Hub/Transport/TransportInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ public function publish(Update $update): PromiseInterface;

public function subscribe(callable $callback): void;

public function unsubscribe(callable $callback): void;

/**
* @param string $lastEventID
* @return Generator<Update>
Expand Down
33 changes: 33 additions & 0 deletions tests/Unit/Hub/Controller/SubscribeControllerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,36 @@
AccessDeniedHttpException::class,
'Error while decoding from Base64Url, invalid base64 characters detected'
);


it('unsubscribes from transport whenever connection closes', function () {
$transport = new PHPTransport(size: 1000);
$controller = new SubscribeController();
$controller->setHub(new Hub(transport: $transport));
$stream = new ThroughStreamStub();

// Given
$hey = new Message(data: 'Hey!');
$hello = new Message(data: 'Hello');
$world = new Message(data: 'World!');
$transport->publish(new Update(['/bar'], $hey)); // Should not be dumped into stream
$transport->publish(new Update(['/foo'], $hello));
$request = new ServerRequest(
'GET',
'/.well-known/mercure?topic=/foo',
['Last-Event-ID' => 'earliest'],
);

// When
$response = $controller($request, $stream);
Loop::addTimer(0.01, fn () => Loop::stop());
Loop::futureTick(fn () => $stream->close());
Loop::futureTick(fn () => $transport->publish(new Update(['/foo'], $world)));
Loop::run();

// Then
expect($response->getStatusCode())->toBe(200);
expect($response->getHeaderLine('Content-Type'))->toBe('text/event-stream');
expect($stream->storage)->toHaveCount(1);
expect($stream->storage[0])->toBe((string) $hello);
});
2 changes: 1 addition & 1 deletion tests/Unit/Hub/Controller/ThroughStreamStub.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,6 @@ public function end($data = null): void

public function close(): void
{
throw new ShouldNotHappen(new LogicException(__METHOD__));
$this->emit('close');
}
}
7 changes: 7 additions & 0 deletions tests/Unit/Hub/HubTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ public function subscribe(callable $callback): void
$this->called['subscribe'] = func_get_args();
}

public function unsubscribe(callable $callback): void
{
$this->called['unsubscribe'] = func_get_args();
}

public function reconciliate(string $lastEventID): Generator
{
$this->called['reconciliate'] = func_get_args();
Expand All @@ -49,11 +54,13 @@ public function reconciliate(string $lastEventID): Generator
$hub->publish($update);
$hub->subscribe($subscribeFn);
iterator_to_array($hub->reconciliate($lastEventId));
$hub->unsubscribe($subscribeFn);

// Then
expect($transport->called['publish'])->toBe([$update]);
expect($transport->called['subscribe'])->toBe([$subscribeFn]);
expect($transport->called['reconciliate'])->toBe([$lastEventId]);
expect($transport->called['unsubscribe'])->toBe([$subscribeFn]);
});

it('complains when requesting an unrecognized option', function () {
Expand Down
14 changes: 13 additions & 1 deletion tests/Unit/Hub/Transport/PHP/PHPTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,20 @@
$transport = new PHPTransport();
$update = new Update(['/foo'], new Message());
$subscriber = (object) ['received' => null];
$transport->subscribe(fn ($receivedUpdate) => $subscriber->received = $receivedUpdate);
$callback = fn($receivedUpdate) => $subscriber->received = $receivedUpdate;
$transport->subscribe($callback);

// When
$transport->publish($update);

// Then
expect($subscriber->received ?? null)->toBe($update);

// When
$transport->unsubscribe($callback);
$transport->publish(new Update(['/foo'], new Message('bar')));

// Then
expect($subscriber->received ?? null)->toBe($update);
});

Expand Down
8 changes: 7 additions & 1 deletion tests/Unit/Hub/Transport/Redis/RedisTransportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,20 @@
// Given
$subscriber = (object) ['received' => null];
$update = new Update(['/foo'], new Message('bar'));
$callback = fn($receivedUpdate) => $subscriber->received = $receivedUpdate;

// When
$transport->subscribe(fn($receivedUpdate) => $subscriber->received = $receivedUpdate);
$transport->subscribe($callback);
$transport->publish($update);

// Then
expect($subscriber->received ?? null)->not()->toBe($update); // Because serialization/deserialization
expect($subscriber->received ?? null)->toEqual($update);

// When
$transport->unsubscribe($callback);
$transport->publish(new Update(['/foo'], new Message('foobar')));
expect($subscriber->received ?? null)->toEqual($update);
});

it('performs state reconciliation', function () {
Expand Down