diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a67de3c..9fe0f3bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ All relevant changes to `mateusjunges/laravel-kafka` will be documented here. +##[2024-11-22 v2.4.1](https://github.com/mateusjunges/laravel-kafka/compare/v2.4.0...v2.4.1) +* Allow to customize flush settings (retries + timeout) by @sash in [#317](https://github.com/mateusjunges/laravel-kafka/pull/317) + ##[2024-11-04 v2.4.0](https://github.com/mateusjunges/laravel-kafka/compare/v2.3.2...v2.4.0) * Allow to use other types of Keys other than strings by @mateusjunges in [#322](https://github.com/mateusjunges/laravel-kafka/pull/322) diff --git a/composer.json b/composer.json index fa5741a4..2b0e0ef6 100644 --- a/composer.json +++ b/composer.json @@ -14,7 +14,8 @@ "phpunit/phpunit": "^10.5", "orchestra/testbench": "^7.16|^8.0", "predis/predis": "^1", - "friendsofphp/php-cs-fixer": "^3.62", + "friendsofphp/php-cs-fixer": "^3.64", + "kwn/php-rdkafka-stubs": "^2.2.1", "rector/rector": "^0.19.8" }, "minimum-stability": "dev", diff --git a/config/kafka.php b/config/kafka.php index 13b318a9..fcfc7889 100644 --- a/config/kafka.php +++ b/config/kafka.php @@ -70,6 +70,16 @@ */ 'flush_retry_sleep_in_ms' => 100, + /* + * The number of retries that will be used when flushing the producer + */ + 'flush_retries' => 10, + + /** + * The flush timeout in milliseconds + */ + 'flush_timeout_in_ms' => 1000, + /* | The cache driver that will be used */ diff --git a/docs/introduction.md b/docs/introduction.md index 11788fd2..9022bb19 100644 --- a/docs/introduction.md +++ b/docs/introduction.md @@ -3,7 +3,7 @@ title: Introduction weight: 1 --- -Do you use Kafka in your laravel projects? All packages I've seen until today, including some built by myself, does not provide a nice syntax usage syntax or, when it does, the test process with these packages are very painful. +Do you use Kafka in your laravel projects? Every package I've seen until today, including some built by myself, does not provide a nice syntax usage syntax or, when it does, the test process with these packages are very painful. This package provides a nice way of producing and consuming kafka messages in your Laravel projects. diff --git a/src/Config/Config.php b/src/Config/Config.php index 800501a5..67e33081 100644 --- a/src/Config/Config.php +++ b/src/Config/Config.php @@ -85,6 +85,8 @@ public function __construct( private readonly int $maxTime = 0, private readonly array $partitionAssignment = [], private readonly ?Closure $whenStopConsuming = null, + public readonly ?int $flushRetries = null, + public readonly ?int $flushTimeoutInMs = null, ) { } diff --git a/src/Contracts/MessageProducer.php b/src/Contracts/MessageProducer.php index aa302640..46a68dba 100644 --- a/src/Contracts/MessageProducer.php +++ b/src/Contracts/MessageProducer.php @@ -50,6 +50,10 @@ public function withMessage(ProducerMessage $message): self; /** Set Sasl configuration. */ public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT'): self; + public function withFlushRetries(int $retries): self; + + public function withFlushTimeout(int $timeoutInMs): self; + /** Specifies which serializer should be used. */ public function usingSerializer(MessageSerializer $serializer): self; diff --git a/src/Producers/Builder.php b/src/Producers/Builder.php index aecb69e1..8189d799 100644 --- a/src/Producers/Builder.php +++ b/src/Producers/Builder.php @@ -24,6 +24,8 @@ class Builder implements MessageProducer private readonly string $broker; private bool $isTransactionProducer = false; private int $maxTransactionRetryAttempts = 5; + private ?int $flushRetries = null; + private ?int $flushTimeoutInMs = null; public function __construct( ?string $broker = null, @@ -166,6 +168,20 @@ public function withDebugDisabled(): self return $this->withDebugEnabled(false); } + public function withFlushRetries(int $retries): self + { + $this->flushRetries = $retries; + + return $this; + } + + public function withFlushTimeout(int $timeoutInMs): self + { + $this->flushTimeoutInMs = $timeoutInMs; + + return $this; + } + /** * Send the given message to Kakfa. * @@ -212,6 +228,8 @@ public function build(): Producer sasl: $this->saslConfig, customOptions: $this->options, callbacks: $this->callbacks, + flushRetries: $this->flushRetries, + flushTimeoutInMs: $this->flushTimeoutInMs, ); $producer = app(Producer::class, [ diff --git a/src/Producers/Producer.php b/src/Producers/Producer.php index aac50cfb..616bd8e8 100644 --- a/src/Producers/Producer.php +++ b/src/Producers/Producer.php @@ -190,10 +190,12 @@ public function flush(): mixed // after sending all messages with Producer::sendBatch $flush = function () { $sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100); + $retries = $this->config->flushRetries ?? config('kafka.flush_retries', 10); + $timeout = $this->config->flushTimeoutInMs ?? config('kafka.flush_timeout_in_ms', 1000); try { - return retry(10, function () { - $result = $this->producer->flush(1000); + return retry($retries, function () use ($timeout) { + $result = $this->producer->flush($timeout); if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) { return true; diff --git a/src/Support/Testing/Fakes/ProducerBuilderFake.php b/src/Support/Testing/Fakes/ProducerBuilderFake.php index 98f7fd7d..2e3f3769 100644 --- a/src/Support/Testing/Fakes/ProducerBuilderFake.php +++ b/src/Support/Testing/Fakes/ProducerBuilderFake.php @@ -22,6 +22,8 @@ class ProducerBuilderFake implements MessageProducer private ?Sasl $saslConfig = null; private string $topic = ''; private ?Closure $producerCallback = null; + private ?int $flushRetries = null; + private ?int $flushTimeoutInMs = null; public function __construct( private readonly ?string $broker = null, @@ -153,6 +155,20 @@ public function withSasl(string $username, string $password, string $mechanisms, return $this; } + public function withFlushRetries(int $retries): self + { + $this->flushRetries = $retries; + + return $this; + } + + public function withFlushTimeout(int $timeoutInMs): self + { + $this->flushTimeoutInMs = $timeoutInMs; + + return $this; + } + /** Specifies which serializer should be used. */ public function usingSerializer(MessageSerializer $serializer): MessageProducer { @@ -210,6 +226,8 @@ public function build(): ProducerFake sasl: $this->saslConfig, customOptions: $this->options, callbacks: $this->callbacks, + flushRetries: $this->flushRetries, + flushTimeoutInMs: $this->flushTimeoutInMs, ); return $this->makeProducer($conf);