[JOBS] AMQP redialer does not resume pipeline (sometimes?) #2012
-
Hello, I'm using RR (via Spiral framework) mainly for RabbitMQ message processing. Problem is, that network connection is not very stable, as RR is running in multiple (100+) servers across the globe. I've configured high timeout values, so RR would eventually reconnect to RabbitMQ server and continue to process messages without intervention. Currently each server consumes at least 4 different queues. The pipelines for those queues are created dynamically: public function declareQueue(string $queueName, array $options = []): void
{
$jobs = new Jobs(RPC::create(RPCConst::CONNECTION));
if (!$this->doesQueueExist($jobs, $queueName)) {
$jobs->create(self::getAMPQCreateInfo($queueName, $options));
}
}
/**
* @throws JobsException
*/
private function doesQueueExist(Jobs $jobs, string $queueName): bool
{
foreach ($jobs->getIterator() as $queue) {
if ($queue->getName() === $queueName) {
return true;
}
}
return false;
}
public static function getAMPQCreateInfo(string $queueName, array $options = []): AMQPCreateInfo
{
$nodeQueueName = sprintf('%d.%s', env('SERVER_ID'), $queueName);
return new AMQPCreateInfo(
name: $queueName, // @phpstan-ignore argument.type
priority: $options['priority'] ?? CreateInfo::PRIORITY_DEFAULT_VALUE,
prefetch: $options['prefetch'] ?? AMQPCreateInfo::PREFETCH_DEFAULT_VALUE,
queue: $options['queue_name'] ?? $nodeQueueName,
exchange: QueueConst::EXCHANGE_NAME,
exchangeType: ExchangeType::Topics,
routingKey: $options['queue_name'] ?? $nodeQueueName,
durable: $options['durable'] ?? true,
exchangeDurable: true,
consumeAll: $options['consume_all'] ?? false,
queueHeaders: array_merge($options['queue_headers'] ?? [], [
'x-queue-type' => 'quorum'
]),
redialTimeout: 2 * 24 * 3600
);
} Everything is working ok, but there are some cases, when redialer restores connection to RabbitMQ and leaves the pipeline paused, which means that message processing stops and RR needs to be restarted or job queue resumed, to continue working. Also, I cannot reproduce it locally - when breaking connection to RabbitMQ, RR always resumes pipeline. What is strange that, when this case occurs, last message of log claims, that redialer exited. Is this expected behaviour? Is there something I can do, to automatically resume pipeline, once RR restores connection? Is there a difference if pipelines are declared dynamically (as currently) or statically, in .rr.yaml file?
.rr.yaml: version: '3'
rpc:
listen: 'tcp://127.0.0.1:6001'
jobs:
timeout: 172800
pool:
num_workers: 1
consume:
- flow
- action
server:
command: 'php app.php'
relay: pipes
env:
- XDEBUG_SESSION: '1'
metrics:
address: '127.0.0.1:2112'
amqp:
addr: 'amqp://<redacted>:<redacted>@<redacted>:5672/'
endure:
log_level: error
watchdog_sec: 60 |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 25 replies
-
Hey @algirdasci 👋 |
Beta Was this translation helpful? Give feedback.
-
Sure! I'm using latest RR build:
|
Beta Was this translation helpful? Give feedback.
I think I found the reason. There is might be a case, when the channel for messages delivery could be closed by the RabbitMQ server, without connection interruption. In that case, RR will close the delivery channel but does not create a new one, since the trigger for redial is broken connection.
I wrote a new Redialer which can be applied to channels + connections and recreate a delivery channel in that case. Expected release: 2024.3.1. Could you please create a bug ticket for that?