Skip to content

Commit

Permalink
Merge master into add-php-8.4
Browse files Browse the repository at this point in the history
  • Loading branch information
mateusjunges committed Dec 19, 2024
2 parents cbdf7c8 + 161361d commit 3411f7b
Show file tree
Hide file tree
Showing 9 changed files with 62 additions and 4 deletions.
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

All relevant changes to `mateusjunges/laravel-kafka` will be documented here.

##[2024-11-22 v2.4.1](https://github.com/mateusjunges/laravel-kafka/compare/v2.4.0...v2.4.1)
* Allow to customize flush settings (retries + timeout) by @sash in [#317](https://github.com/mateusjunges/laravel-kafka/pull/317)

##[2024-11-04 v2.4.0](https://github.com/mateusjunges/laravel-kafka/compare/v2.3.2...v2.4.0)
* Allow to use other types of Keys other than strings by @mateusjunges in [#322](https://github.com/mateusjunges/laravel-kafka/pull/322)

Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
"phpunit/phpunit": "^10.5",
"orchestra/testbench": "^7.16|^8.0",
"predis/predis": "^1",
"friendsofphp/php-cs-fixer": "^3.62",
"friendsofphp/php-cs-fixer": "^3.64",
"kwn/php-rdkafka-stubs": "^2.2.1",
"rector/rector": "^0.19.8"
},
"minimum-stability": "dev",
Expand Down
10 changes: 10 additions & 0 deletions config/kafka.php
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@
*/
'flush_retry_sleep_in_ms' => 100,

/*
* The number of retries that will be used when flushing the producer
*/
'flush_retries' => 10,

/**
* The flush timeout in milliseconds
*/
'flush_timeout_in_ms' => 1000,

/*
| The cache driver that will be used
*/
Expand Down
2 changes: 1 addition & 1 deletion docs/introduction.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ title: Introduction
weight: 1
---

Do you use Kafka in your laravel projects? All packages I've seen until today, including some built by myself, does not provide a nice syntax usage syntax or, when it does, the test process with these packages are very painful.
Do you use Kafka in your laravel projects? Every package I've seen until today, including some built by myself, does not provide a nice syntax usage syntax or, when it does, the test process with these packages are very painful.

This package provides a nice way of producing and consuming kafka messages in your Laravel projects.

Expand Down
2 changes: 2 additions & 0 deletions src/Config/Config.php
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public function __construct(
private readonly int $maxTime = 0,
private readonly array $partitionAssignment = [],
private readonly ?Closure $whenStopConsuming = null,
public readonly ?int $flushRetries = null,
public readonly ?int $flushTimeoutInMs = null,
) {
}

Expand Down
4 changes: 4 additions & 0 deletions src/Contracts/MessageProducer.php
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public function withMessage(ProducerMessage $message): self;
/** Set Sasl configuration. */
public function withSasl(string $username, string $password, string $mechanisms, string $securityProtocol = 'SASL_PLAINTEXT'): self;

public function withFlushRetries(int $retries): self;

public function withFlushTimeout(int $timeoutInMs): self;

/** Specifies which serializer should be used. */
public function usingSerializer(MessageSerializer $serializer): self;

Expand Down
18 changes: 18 additions & 0 deletions src/Producers/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class Builder implements MessageProducer
private readonly string $broker;
private bool $isTransactionProducer = false;
private int $maxTransactionRetryAttempts = 5;
private ?int $flushRetries = null;
private ?int $flushTimeoutInMs = null;

public function __construct(
?string $broker = null,
Expand Down Expand Up @@ -166,6 +168,20 @@ public function withDebugDisabled(): self
return $this->withDebugEnabled(false);
}

public function withFlushRetries(int $retries): self
{
$this->flushRetries = $retries;

return $this;
}

public function withFlushTimeout(int $timeoutInMs): self
{
$this->flushTimeoutInMs = $timeoutInMs;

return $this;
}

/**
* Send the given message to Kakfa.
*
Expand Down Expand Up @@ -212,6 +228,8 @@ public function build(): Producer
sasl: $this->saslConfig,
customOptions: $this->options,
callbacks: $this->callbacks,
flushRetries: $this->flushRetries,
flushTimeoutInMs: $this->flushTimeoutInMs,
);

$producer = app(Producer::class, [
Expand Down
6 changes: 4 additions & 2 deletions src/Producers/Producer.php
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,12 @@ public function flush(): mixed
// after sending all messages with Producer::sendBatch
$flush = function () {
$sleepMilliseconds = config('kafka.flush_retry_sleep_in_ms', 100);
$retries = $this->config->flushRetries ?? config('kafka.flush_retries', 10);
$timeout = $this->config->flushTimeoutInMs ?? config('kafka.flush_timeout_in_ms', 1000);

try {
return retry(10, function () {
$result = $this->producer->flush(1000);
return retry($retries, function () use ($timeout) {
$result = $this->producer->flush($timeout);

if (RD_KAFKA_RESP_ERR_NO_ERROR === $result) {
return true;
Expand Down
18 changes: 18 additions & 0 deletions src/Support/Testing/Fakes/ProducerBuilderFake.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ class ProducerBuilderFake implements MessageProducer
private ?Sasl $saslConfig = null;
private string $topic = '';
private ?Closure $producerCallback = null;
private ?int $flushRetries = null;
private ?int $flushTimeoutInMs = null;

public function __construct(
private readonly ?string $broker = null,
Expand Down Expand Up @@ -153,6 +155,20 @@ public function withSasl(string $username, string $password, string $mechanisms,
return $this;
}

public function withFlushRetries(int $retries): self
{
$this->flushRetries = $retries;

return $this;
}

public function withFlushTimeout(int $timeoutInMs): self
{
$this->flushTimeoutInMs = $timeoutInMs;

return $this;
}

/** Specifies which serializer should be used. */
public function usingSerializer(MessageSerializer $serializer): MessageProducer
{
Expand Down Expand Up @@ -210,6 +226,8 @@ public function build(): ProducerFake
sasl: $this->saslConfig,
customOptions: $this->options,
callbacks: $this->callbacks,
flushRetries: $this->flushRetries,
flushTimeoutInMs: $this->flushTimeoutInMs,
);

return $this->makeProducer($conf);
Expand Down

0 comments on commit 3411f7b

Please sign in to comment.