This package can be used for easy access to the RabbitMQ entities like connections or queues.
Installed php extension
ext-amqp
is required. Installation steps can be found in Dockerfile.
Require this package with composer using the following command:
$ composer require avto-dev/amqp-rabbit-manager "^2.0"
Installed
composer
is required (how to install composer).
You need to fix the major version of package.
After that you should "publish" package configuration file using next command:
$ php ./artisan vendor:publish --provider='AvtoDev\AmqpRabbitManager\ServiceProvider'
And configure it in the file ./config/rabbitmq.php
.
At first you should execute command rabbit:setup
for creating all queues and exchanges on RabbitMQ server.
Then, in any part of your application you can resolve connection or queue/exchange factories. For example, in artisan command:
<?php
namespace App\Console\Commands;
use AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface;
class SomeCommand extends \Illuminate\Console\Command
{
/**
* The console command name.
*
* @var string
*/
protected $name = 'some:command';
/**
* Execute the console command.
*
* @param ConnectionsFactoryInterface $connections
*
* @return void
*/
public function handle(ConnectionsFactoryInterface $connections): void
{
$connections->default(); // Get the default RabbitMQ connection instance
}
}
Declare queue operation creates a queue on a broker side (use command rabbit:setup
instead this):
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$exchange = $connections
->default()
->declareQueue($queues->make('some-queue-id'));
Declare exchange operation creates a topic on a broker side (use command rabbit:setup
instead this):
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */
$exchange = $connections
->default()
->declareTopic($exchanges->make('some-exchange-id'));
Connects a queue to the exchange. So messages from that topic comes to the queue and could be processed (use command rabbit:setup
events \AvtoDev\AmqpRabbitManager\Commands\Events\*
instead this):
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */
$connections
->default()
->bind(new \Interop\Amqp\Impl\AmqpBind(
$exchanges->make('some-exchange-id'),
$queues->make('some-queue-id')
));
Create message and them to the exchange:
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\ExchangesFactoryInterface $exchanges */
$connection = $connections->default();
$message = $connection->createMessage('Hello world!');
$connection
->createProducer()
->send($exchanges->make('some-exchange-id'), $message);
Create message and them to the queue:
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$connection = $connections->default();
$message = $connection->createMessage('Hello world!');
$connection
->createProducer()
->send($queues->make('some-queue-id'), $message);
Messages priority uses for messages ordering:
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$connection = $connections->default();
$message = $connection->createMessage('Hello world!');
$connection
->createProducer()
->setPriority(10)
// ...
->send($queues->make('some-queue-id'), $message);
Also known as message TTL:
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$connection = $connections->default();
$message = $connection->createMessage('Hello world!');
$connection
->createProducer()
->setTimeToLive(60000) // 60 sec
// ...
->send($queues->make('some-queue-id'), $message);
You should avoid to use enqueue/amqp-tools
delay strategies, if you can. If you makes it manually - you have full control under it.
Get one message and continue script execution:
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$consumer = $connections->default()->createConsumer($queues->make('some-queue-id'));
$message = $consumer->receive();
try {
// .. process a message ..
$consumer->acknowledge($message);
} catch (\Exception $e) {
// .. process exception ..
$consumer->reject($message);
}
Start (nearly) infinity loop for messages processing (you can start more then one consumer in a one time, just call ``):
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$connection = $connections->default();
$queue = $queues->make('some-queue-id');
$consumer = $connection->createConsumer($queue);
$subscriber = $connection->createSubscriptionConsumer();
$subscriber->subscribe(
$consumer,
function(\Interop\Amqp\AmqpMessage $message, \Enqueue\AmqpExt\AmqpConsumer $consumer): bool {
try {
// .. process a message ..
$consumer->acknowledge($message);
} catch (\Exception $e) {
// .. process exception ..
$consumer->reject($message);
return false; // Subscription will be cancelled
}
return true; // Subscription will be continued
}
);
$subscriber->consume(); // You can pass timeout in milliseconds
Remove all messages in queue:
<?php
/** @var \AvtoDev\AmqpRabbitManager\ConnectionsFactoryInterface $connections */
/** @var \AvtoDev\AmqpRabbitManager\QueuesFactoryInterface $queues */
$connection = $connections->default();
$connection->purgeQueue($queues->make('some-queue-id'));
For package testing we use phpunit
framework and docker-ce
+ docker-compose
as develop environment. So, just write into your terminal after repository cloning:
$ make build
$ make latest # or 'make lowest'
$ make test
Changes log can be found here.
If you will find any package errors, please, make an issue in current repository.
This is open-sourced software licensed under the MIT License.