Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Added retryInterval, maxRetries and maxRetryDelay to WriteApi #32

Merged
merged 6 commits into from
Jul 29, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## 1.6.0 [unreleased]

### Features
1. [#32](https://github.com/influxdata/influxdb-client-php/pull/32): Added retryInterval, maxRetries and maxRetryDelay to WriteOptions in WriteApi

## 1.5.0 [2020-07-17]

### Features
Expand Down
5 changes: 4 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ The writes are processed in batches which are configurable by `WriteOptions`:
| Property | Description | Default Value |
| --- | --- | --- |
| **writeType** | type of write SYNCHRONOUS / BATCHING / | SYNCHRONOUS |
| **batchSize** | the number of data point to collect in batch | 1000 |
| **batchSize** | the number of data point to collect in batch | 10 |
| **flushInterval** | the number of milliseconds before the batch is written | 1000 |
| **retryInterval** | the number of milliseconds to retry unsuccessful write. The retry interval is "exponentially" used when the InfluxDB server does not specify "Retry-After" header. | 1000 |
| **maxRetries** | the number of max retries when write fails | 3 |
| **maxRetryDelay** | maximum delay when retrying write in milliseconds | 15000 |

```php
use InfluxDB2\Client;
Expand Down
27 changes: 26 additions & 1 deletion src/InfluxDB2/WriteApi.php
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,32 @@ public function writeRaw(string $data, string $precision = null, string $bucket

$queryParams = ["org" => $orgParam, "bucket" => $bucketParam, "precision" => $precisionParam];

$this->post($data, "/api/v2/write", $queryParams);
$this->writeRawInternal($data, $queryParams, 1, $this->writeOptions->retryInterval);
}

private function writeRawInternal(string $data, array $queryParams, int $attempts, int $retryInterval)
{
try {
$this->post($data, "/api/v2/write", $queryParams);
} catch (ApiException $e) {
$code = $e->getCode();

if ($code == null || !($code == 429 || $code == 503) || $attempts > $this->writeOptions->maxRetries) {
throw $e;
}

$headers = $e->getResponseHeaders();

if (array_key_exists('Retry-After', $headers)) {
$timeout = (int)$headers['Retry-After'][0] * 1000000.0;
} else {
$timeout = min($retryInterval, $this->writeOptions->maxRetryDelay) * 1000.0;
}

usleep($timeout);

$this->writeRawInternal($data, $queryParams, $attempts + 1, $retryInterval * 2);
}
}

public function close()
Expand Down
13 changes: 13 additions & 0 deletions src/InfluxDB2/WriteOptions.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,27 @@ class WriteOptions
{
const DEFAULT_BATCH_SIZE = 10;
const DEFAULT_FLUSH_INTERVAL = 1000;
const DEFAULT_RETRY_INTERVAL = 1000;
const DEFAULT_MAX_RETRIES = 3;
const DEFAULT_MAX_RETRY_DELAY = 15000;

public $writeType;
public $batchSize;
public $flushInterval;
public $retryInterval;
public $maxRetries;
public $maxRetryDelay;

/**
* WriteOptions constructor.
* $writeOptions = [
* 'writeType' => methods of write (WriteType::SYNCHRONOUS - default, WriteType::BATCHING)
* 'batchSize' => the number of data point to collect in batch
* 'flushInterval' => flush data at least in this interval
* 'retryInterval' => number of milliseconds to retry unsuccessful write
* 'maxRetries' => max number of retries when write fails
* The retry interval is used when the InfluxDB server does not specify "Retry-After" header.
* 'maxRetryDelay' => maximum delay when retrying write
* ]
* @param array $writeOptions Array containing the write parameters (See above)
*/
Expand All @@ -26,6 +36,9 @@ public function __construct(array $writeOptions = null)
$this->writeType = $writeOptions["writeType"] ?? WriteType::SYNCHRONOUS;
$this->batchSize = $writeOptions["batchSize"] ?? self::DEFAULT_BATCH_SIZE;
$this->flushInterval = $writeOptions["flushInterval"] ?? self::DEFAULT_FLUSH_INTERVAL;
$this->retryInterval = $writeOptions["retryInterval"] ?? self::DEFAULT_RETRY_INTERVAL;
$this->maxRetries = $writeOptions["maxRetries"] ?? self::DEFAULT_MAX_RETRIES;
$this->maxRetryDelay = $writeOptions["maxRetryDelay"] ?? self::DEFAULT_MAX_RETRY_DELAY;
}
}

95 changes: 95 additions & 0 deletions tests/WriteApiBatchingTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use GuzzleHttp\Middleware;
use GuzzleHttp\Psr7\Request;
use GuzzleHttp\Psr7\Response;
use InfluxDB2\ApiException;
use InfluxDB2\Point;
use InfluxDB2\WriteType;
use Webmozart\Assert\Assert;

Expand Down Expand Up @@ -122,4 +124,97 @@ public function testFlushAllByCloseClient()
. "h2o_feet,location=coyote_creek level\\ water_level=2.0 2\n"
. 'h2o_feet,location=coyote_creek level\\ water_level=3.0 3', $request->getBody());
}

public function testRetryIntervalByConfig()
{
$errorBody = '{"code":"temporarily unavailable","message":"Token is temporarily over quota.'
. 'The Retry-After header describes when to try the write again."}';

$this->mockHandler->append(new Response(429, ['X-Platform-Error-Code' => 'temporarily unavailable'],
$errorBody), new Response(204));

$this->writeApi->write('h2o_feet,location=coyote_creek water_level=1.0 1');
$this->writeApi->write('h2o_feet,location=coyote_creek water_level=2.0 2');

$this->assertEquals(2, count($this->container));
$request = $this->mockHandler->getLastRequest();

$this->assertEquals('http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns',
strval($request->getUri()));
$this->assertEquals("h2o_feet,location=coyote_creek water_level=1.0 1\n"
. "h2o_feet,location=coyote_creek water_level=2.0 2", $request->getBody()->getContents());
}

public function testRetryIntervalByHeader()
{
$errorBody = '{"code":"temporarily unavailable","message":"Token is temporarily over quota.'
. 'The Retry-After header describes when to try the write again."}';

$this->mockHandler->append(new Response(429, ['X-Platform-Error-Code' => 'temporarily unavailable',
'Retry-After' => '3'],
$errorBody), new Response(204));

$this->writeApi->write('h2o_feet,location=coyote_creek water_level=1.0 1');
$this->writeApi->write('h2o_feet,location=coyote_creek water_level=2.0 2');

$this->assertEquals(2, count($this->container));
$request = $this->mockHandler->getLastRequest();

$this->assertEquals('http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns',
strval($request->getUri()));
$this->assertEquals("h2o_feet,location=coyote_creek water_level=1.0 1\n"
. "h2o_feet,location=coyote_creek water_level=2.0 2", $request->getBody()->getContents());
}

public function testRetryIntervalMaxRetries()
{
$errorBody = '{"code":"temporarily unavailable","message":"Token is temporarily over quota.'
. 'The Retry-After header describes when to try the write again."}';

$this->writeApi->writeOptions->maxRetries = 2;

$this->mockHandler->append(
new Response(429, ['X-Platform-Error-Code' => 'temporarily unavailable'], $errorBody),
new Response(429, ['X-Platform-Error-Code' => 'temporarily unavailable'], $errorBody),
new Response(429, ['X-Platform-Error-Code' => 'temporarily unavailable'], $errorBody));

$this->expectException(ApiException::class);

$this->writeApi->write('h2o_feet,location=coyote_creek water_level=1.0 1');
$this->writeApi->write('h2o_feet,location=coyote_creek water_level=2.0 2');

$this->assertEquals(3, count($this->container));
}

public function testRetryCount()
{
$this->mockHandler->append(
// regular call
new Response(429),
// retry
new Response(429),
// retry
new Response(429),
// retry
new Response(429),
// not called
new Response(429));

$this->writeApi->writeOptions->batchSize = 1;

$point = Point::measurement('h2o')
->addTag('location', 'europe')
->addField('level', 2);

try {
$this->writeApi->write($point);
} catch (ApiException $e) {
$this->assertEquals(429, $e->getCode());
}

$this->assertEquals(4, count($this->container));

$count = $this->mockHandler->count();
$this->assertEquals(1, $count);
}
}
30 changes: 30 additions & 0 deletions tests/WriteApiTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,34 @@ public function testInfluxException()
$this->fail();
}
}

public function testRetryCount()
{
$this->mockHandler->append(
// regular call
new Response(429),
// retry
new Response(429),
// retry
new Response(429),
// retry
new Response(429),
// not called
new Response(429));

$point = Point::measurement('h2o')
->addTag('location', 'europe')
->addField('level', 2);

try {
$this->writeApi->write($point);
} catch (ApiException $e) {
$this->assertEquals(429, $e->getCode());
}

$this->assertEquals(4, count($this->container));

$count = $this->mockHandler->count();
$this->assertEquals(1, $count);
}
}