Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Memcached Reporter #206

Merged
merged 24 commits into from
Jun 30, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
"require": {
"php": "^7.3 || ^8.0",
"ext-curl": "*",
"ext-memcached": "*",
Clivern marked this conversation as resolved.
Show resolved Hide resolved
"psr/http-message": "~1.0",
"psr/log": "^1.0"
},
Expand Down
111 changes: 111 additions & 0 deletions src/Zipkin/Reporters/Aggregation/MemcachedClient.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
<?php

declare(strict_types=1);

namespace Zipkin\Reporters\Aggregation;

use Memcached;
use Exception;

class MemcachedClient
{
const GET_EXTENDED = Memcached::GET_EXTENDED;

/**
* @var Memcached
*/
private $client;

/**
* @var string
*/
private $server;

/**
* @var int
*/
private $port;

/**
* @param string $server
* @param int $port
* @param bool $enableCompression
*/
public function __construct(
string $server = '127.0.0.1',
int $port = 11211,
bool $enableCompression = true
Clivern marked this conversation as resolved.
Show resolved Hide resolved
) {
$this->server = $server;
$this->port = $port;

$this->client = new Memcached();
$this->client->setOption(Memcached::OPT_COMPRESSION, $enableCompression);
$this->client->addServer($this->server, $this->port);
}

/**
* Check connection
*
* @return bool
*/
public function ping(): bool
{
if (false === @fsockopen($this->server, $this->port)) {
throw new Exception(
"Unable to connect to memcached server {$this->server}:{$this->port}"
);
}

return true;
}

/**
* Set an item
*
* @param string $key
* @param mixed $value
* @param int $expiration
*/
public function set($key, $value, $expiration = 0): bool
{
return $this->client->set($key, $value, $expiration);
}

/**
* Get item by key.
*
* @param string $key
* @param mixed $cacheCallback
* @param int $flags
*
* @return mixed
*/
public function get($key, $cacheCallback = null, $flags = null)
{
return $this->client->get($key, $cacheCallback, $flags);
}

/**
* Compare and swap an item.
*
* @param float $casToken
* @param string $key
* @param mixed $value
* @param int $expiration
*/
public function cas($casToken, $key, $value, $expiration = 0): bool
Clivern marked this conversation as resolved.
Show resolved Hide resolved
{
return $this->client->cas($casToken, $key, $value, $expiration);
}

/**
* Quit all connections.
*
* @return bool
*/
public function quit(): bool
{
return $this->client->quit();
}
}
6 changes: 4 additions & 2 deletions src/Zipkin/Reporters/Http.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public function __construct(
}

/**
* @param ReadbackSpan[] $spans
* @param ReadbackSpan[]|array $spans
* @return void
*/
public function report(array $spans): void
Expand All @@ -82,7 +82,8 @@ public function report(array $spans): void
return;
}

$payload = $this->serializer->serialize($spans);
$payload = (is_object($spans[0])) ? $this->serializer->serialize($spans) : json_encode($spans);
Clivern marked this conversation as resolved.
Show resolved Hide resolved

if ($payload === false) {
$this->logger->error(
\sprintf('failed to encode spans with code %d', \json_last_error())
Expand All @@ -91,6 +92,7 @@ public function report(array $spans): void
}

$client = $this->clientFactory->build($this->options);

try {
$client($payload);
} catch (RuntimeException $e) {
Expand Down
176 changes: 176 additions & 0 deletions src/Zipkin/Reporters/Memcached.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
<?php

declare(strict_types=1);

namespace Zipkin\Reporters;

use Zipkin\Reporter;
use Psr\Log\NullLogger;
use Psr\Log\LoggerInterface;
use Zipkin\Reporters\SpanSerializer;
use Zipkin\Reporters\JsonV2Serializer;
use Zipkin\Reporters\Aggregation\MemcachedClient;
use Exception;

final class Memcached implements Reporter
{
public const DEFAULT_OPTIONS = [
'cache_key' => 'zipkin_traces',
];

/**
* @var array
*/
private $options;

/**
* @var MemcachedClient
*/
private $memcachedClient;

/**
* @var LoggerInterface
*/
private $logger;

/**
* @var SpanSerializer
*/
private $serializer;

/**
* @param array $options
* @param MemcachedClient $memcachedClient
* @param LoggerInterface $logger
* @param SpanSerializer $serializer
*/
public function __construct(
array $options = [],
jcchavezs marked this conversation as resolved.
Show resolved Hide resolved
MemcachedClient $memcachedClient = null,
LoggerInterface $logger = null,
SpanSerializer $serializer = null
) {
$this->options = \array_merge(self::DEFAULT_OPTIONS, $options);
$this->memcachedClient = $memcachedClient ?? new MemcachedClient();
$this->logger = $logger ?? new NullLogger();
$this->serializer = $serializer ?? new JsonV2Serializer();
}

/**
* @param array $spans
*/
public function report(array $spans): void
{
try {
$this->memcachedClient->ping();

// Fetch stored spans
$result = $this->memcachedClient->get(
Clivern marked this conversation as resolved.
Show resolved Hide resolved
$this->options['cache_key'],
null,
MemcachedClient::GET_EXTENDED
);

$payload = $this->serializer->serialize($spans);

if ($payload === false) {
$this->logger->error(
\sprintf('failed to encode spans with code %d', \json_last_error())
);
}

// Store spans if there aren't any previous spans
if (empty($result)) {
$this->memcachedClient->set($this->options['cache_key'], $payload);
$this->memcachedClient->quit();
return;
}

$status = false;

// Merge the new spans with the stored spans only if
// the item not updated by a different concurrent proceess
while (!$status) {
$result['value'] = array_merge(
Clivern marked this conversation as resolved.
Show resolved Hide resolved
json_decode($result['value'], true),
json_decode($payload, true)
);

$status = $this->memcachedClient->cas(
$result['cas'],
$this->options['cache_key'],
json_encode($result['value'])
);

if (!$status) {
$result = $this->memcachedClient->get(
Clivern marked this conversation as resolved.
Show resolved Hide resolved
$this->options['cache_key'],
null,
MemcachedClient::GET_EXTENDED
);
}
}

$this->memcachedClient->quit();
} catch (Exception $e) {
$this->logger->error(
Clivern marked this conversation as resolved.
Show resolved Hide resolved
\sprintf('Error while calling memcached server: %s', $e->getMessage())
);
}

return;
}

/**
* @return array
*/
public function flush(): array
{
try {
$this->memcachedClient->ping();

// Fetch stored spans
$result = $this->memcachedClient->get(
$this->options['cache_key'],
null,
MemcachedClient::GET_EXTENDED
);

if (empty($result)) {
$this->memcachedClient->quit();

return [];
}

$status = false;

// Return stored spans and set the key value as empty only if
// the item not updated by a different concurrent proceess
while (!$status) {
$status = $this->memcachedClient->cas(
$result['cas'],
$this->options['cache_key'],
json_encode([])
);

if (!$status) {
$result = $this->memcachedClient->get(
$this->options['cache_key'],
null,
MemcachedClient::GET_EXTENDED
);
}
}

$this->memcachedClient->quit();

return json_decode($result['value'], true);
} catch (Exception $e) {
$this->logger->error(
\sprintf('Error while calling memcached server: %s', $e->getMessage())
);
}

return [];
}
}
Loading