diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f4546c5..d7ec8ce 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -24,6 +24,7 @@ jobs: with: php-version: ${{ matrix.php-versions }} coverage: xdebug #optional + extensions: memcached - name: Get composer cache directory id: composer-cache run: echo "::set-output name=dir::$(composer config cache-files-dir)" diff --git a/src/Zipkin/Reporters/Aggregation/MemcachedClient.php b/src/Zipkin/Reporters/Aggregation/MemcachedClient.php new file mode 100644 index 0000000..a8b1da2 --- /dev/null +++ b/src/Zipkin/Reporters/Aggregation/MemcachedClient.php @@ -0,0 +1,131 @@ + true, + ]; + + /** + * @var \Memcached + */ + private $client; + + /** + * @var string + */ + private $server; + + /** + * @var int + */ + private $port; + + /** + * @var int + */ + private $timeout; + + /** + * @param string $server + * @param int $port + * @param int $timeout + * @param array $options + */ + public function __construct( + string $server = '127.0.0.1', + int $port = 11211, + int $timeout = 30, + array $options = [] + ) { + $this->server = $server; + $this->port = $port; + $this->timeout = $timeout; + + if (!class_exists('\Memcached')) { + throw new Exception("PHP ext-memcached is required"); + } + + $this->client = new \Memcached(); + $this->client->addServer($this->server, $this->port); + + $options = \array_merge(self::DEFAULT_OPTIONS, $options); + foreach ($options as $key => $value) { + $this->client->setOption($key, $value); + } + } + + /** + * Check connection + * + * @return bool + */ + public function ping(): bool + { + if (false === @fsockopen($this->server, $this->port, $errno, $errstr, $this->timeout)) { + throw new Exception(sprintf( + "Unable to connect to memcached server {$this->server}:{$this->port}: %s", + $errstr + )); + } + + return true; + } + + /** + * Set an item + * + * @param string $key + * @param mixed $value + * @param int $expiration + */ + public function set($key, $value, $expiration = 0): bool + { + return $this->client->set($key, $value, $expiration); + } + + /** + * Get item by key. + * + * @param string $key + * @param mixed $cacheCallback + * @param int $flags + * + * @return mixed + */ + public function get($key, $cacheCallback = null, $flags = 0) + { + return $this->client->get($key, $cacheCallback, $flags); + } + + /** + * Compare and swap an item. + * + * @param float $casToken + * @param string $key + * @param mixed $value + * @param int $expiration + */ + public function compareAndSwap($casToken, $key, $value, $expiration = 0): bool + { + return $this->client->cas($casToken, $key, $value, $expiration); + } + + /** + * Quit all connections. + * + * @return bool + */ + public function quit(): bool + { + return $this->client->quit(); + } +} diff --git a/src/Zipkin/Reporters/Http.php b/src/Zipkin/Reporters/Http.php index d507290..b15e9af 100644 --- a/src/Zipkin/Reporters/Http.php +++ b/src/Zipkin/Reporters/Http.php @@ -73,7 +73,7 @@ public function __construct( } /** - * @param ReadbackSpan[] $spans + * @param ReadbackSpan[]|array $spans * @return void */ public function report(array $spans): void @@ -83,6 +83,7 @@ public function report(array $spans): void } $payload = $this->serializer->serialize($spans); + if ($payload === false) { $this->logger->error( \sprintf('failed to encode spans with code %d', \json_last_error()) @@ -91,6 +92,7 @@ public function report(array $spans): void } $client = $this->clientFactory->build($this->options); + try { $client($payload); } catch (RuntimeException $e) { diff --git a/src/Zipkin/Reporters/Memcached.php b/src/Zipkin/Reporters/Memcached.php new file mode 100644 index 0000000..8279d22 --- /dev/null +++ b/src/Zipkin/Reporters/Memcached.php @@ -0,0 +1,252 @@ + 'zipkin_traces', + 'batch_interval' => 60, // In seconds + 'batch_size' => 100 + ]; + + /** + * @var array + */ + private $options; + + /** + * @var MemcachedClient + */ + private $memcachedClient; + + /** + * @var LoggerInterface + */ + private $logger; + + /** + * @var SpanSerializer + */ + private $serializer; + + /** + * @var Reporter + */ + private $reporter; + + /** + * @param array $options + * @param Reporter $reporter + * @param MemcachedClient $memcachedClient + * @param LoggerInterface $logger + * @param SpanSerializer $serializer + */ + public function __construct( + array $options, + Reporter $reporter, + MemcachedClient $memcachedClient = null, + LoggerInterface $logger = null, + SpanSerializer $serializer = null + ) { + $this->options = \array_merge(self::DEFAULT_OPTIONS, $options); + $this->reporter = $reporter; + $this->memcachedClient = $memcachedClient ?? new MemcachedClient(); + $this->logger = $logger ?? new NullLogger(); + $this->serializer = $serializer ?? new JsonV2Serializer(); + } + + /** + * @param array $spans + */ + public function report(array $spans): void + { + try { + $status = false; + $aggregatedSpans = []; + + $this->memcachedClient->ping(); + + while (!$status) { + $result = $this->memcachedClient->get( + sprintf("%s_spans", $this->options['cache_key_prefix']), + null, + MemcachedClient::GET_EXTENDED + ); + + if (empty($result)) { + $this->memcachedClient->set( + sprintf("%s_spans", $this->options['cache_key_prefix']), + serialize($spans) + ); + $this->memcachedClient->quit(); + return; + } + + $result['value'] = array_merge( + unserialize($result['value']), + $spans + ); + + // If batch reporting interval passed and enabled, send spans to zipkin server and reset the value + if ($this->isBatchIntervalPassed()) { + $aggregatedSpans = $result['value']; + $result['value'] = []; + $this->resetBatchInterval(); + } + + // If batch reporting size reached and enabled, send spans to zipkin server and reset the value + if (($this->options['batch_size'] > 0) && (count($result['value']) >= $this->options['batch_size'])) { + $aggregatedSpans = $result['value']; + $result['value'] = []; + } + + $status = $this->memcachedClient->compareAndSwap( + $result['cas'], + sprintf("%s_spans", $this->options['cache_key_prefix']), + serialize($result['value']) + ); + } + $this->memcachedClient->quit(); + } catch (Exception $e) { + $this->logger->error( + \sprintf('Error while calling memcached server: %s', $e->getMessage()) + ); + + // If memcached is down or there was any failure happened, send reported spans directly to zipkin + if (empty($aggregatedSpans) && !empty($spans)) { + $this->reporter->report($spans); + } + } finally { + // Send all aggregated spans only if reporting time reached and memcached value got cleared. + if ($status && !empty($aggregatedSpans)) { + $this->reporter->report($aggregatedSpans); + } + } + + return; + } + + /** + * @return array + */ + public function flush(): array + { + try { + $status = false; + + $this->memcachedClient->ping(); + + while (!$status) { + $result = $this->memcachedClient->get( + sprintf("%s_spans", $this->options['cache_key_prefix']), + null, + MemcachedClient::GET_EXTENDED + ); + + if (empty($result)) { + $this->memcachedClient->quit(); + return []; + } + + $status = $this->memcachedClient->compareAndSwap( + $result['cas'], + sprintf("%s_spans", $this->options['cache_key_prefix']), + serialize([]) + ); + } + + $this->memcachedClient->quit(); + + return unserialize($result['value']); + } catch (Exception $e) { + $this->logger->error( + \sprintf('Error while calling memcached server: %s', $e->getMessage()) + ); + } + + return []; + } + + /** + * @return boolean + */ + private function isBatchIntervalPassed(): bool + { + if ($this->options['batch_interval'] <= 0) { + return false; + } + + $result = $this->memcachedClient->get( + sprintf("%s_batch_ts", $this->options['cache_key_prefix']), + null, + MemcachedClient::GET_EXTENDED + ); + + if (empty($result)) { + return true; + } + + return (intval($result['value'] + $this->options['batch_interval']) <= time()); + } + + /** + * Reset Batch Interval + * + * @return bool + */ + private function resetBatchInterval(): bool + { + if ($this->options['batch_interval'] <= 0) { + return false; + } + + try { + $status = false; + + $this->memcachedClient->ping(); + + while (!$status) { + $result = $this->memcachedClient->get( + sprintf("%s_batch_ts", $this->options['cache_key_prefix']), + null, + MemcachedClient::GET_EXTENDED + ); + + if (empty($result)) { + $this->memcachedClient->set( + sprintf("%s_batch_ts", $this->options['cache_key_prefix']), + time() + ); + + $this->memcachedClient->quit(); + return true; + } + + $status = $this->memcachedClient->compareAndSwap( + $result['cas'], + sprintf("%s_batch_ts", $this->options['cache_key_prefix']), + time() + ); + } + + $this->memcachedClient->quit(); + } catch (Exception $e) { + $this->logger->error( + \sprintf('Error while calling memcached server: %s', $e->getMessage()) + ); + } + + return true; + } +} diff --git a/tests/Unit/Reporters/MemcachedTest.php b/tests/Unit/Reporters/MemcachedTest.php new file mode 100644 index 0000000..0bad40f --- /dev/null +++ b/tests/Unit/Reporters/MemcachedTest.php @@ -0,0 +1,372 @@ +getMethod($methodName); + $method->setAccessible(true); + + return $method->invokeArgs($object, $parameters); + } + + public function testReportError() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $logger = $this->createMock(LoggerInterface::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient, $logger); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->will($this->throwException(new Exception("Unable to connect"))); + + $logger->expects($this->exactly(1)) + ->method('error'); + + $httpReporter->expects($this->exactly(1)) + ->method('report') + ->with([new \stdClass()]); + + $memcached->report([new \stdClass()]); + } + + public function testReportSuccessWithoutAggregatedSpans() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $logger = $this->createMock(LoggerInterface::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient, $logger); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->willReturn(true); + + $memcachedClient->expects($this->exactly(1)) + ->method('get') + ->with('zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED) + ->willReturn(null); + + + $memcachedClient->expects($this->exactly(1)) + ->method('set') + ->with('zipkin_traces_spans', serialize([new \stdClass()])); + + $memcachedClient->expects($this->exactly(1)) + ->method('quit') + ->willReturn(true); + + $memcached->report([new \stdClass()]); + } + + public function testReportSuccessWithSpans01() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $logger = $this->createMock(LoggerInterface::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient, $logger); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->willReturn(true); + + $memcachedClient->expects($this->exactly(4)) + ->method('get') + ->withConsecutive( + ['zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED], + ['zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED], + ['zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED], + ['zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED], + )->willReturnOnConsecutiveCalls( + ['cas' => 123, 'value' => serialize([new \stdClass()])], + ['cas' => 127, 'value' => time()], + ['cas' => 124, 'value' => serialize([new \stdClass()])], + ['cas' => 129, 'value' => time()] + ); + + $memcachedClient->expects($this->exactly(2)) + ->method('compareAndSwap') + ->withConsecutive( + ['123', 'zipkin_traces_spans', serialize([new \stdClass(), new \stdClass()])], + ['124', 'zipkin_traces_spans', serialize([new \stdClass(), new \stdClass()])] + )->willReturnOnConsecutiveCalls( + false, + true + ); + + $memcachedClient->expects($this->once()) + ->method('quit') + ->willReturn(true); + + $memcached->report([new \stdClass()]); + } + + public function testReportSuccessWithSpans02() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $logger = $this->createMock(LoggerInterface::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient, $logger); + + $memcachedClient->expects($this->exactly(2)) + ->method('ping') + ->willReturn(true); + + $memcachedClient->expects($this->exactly(3)) + ->method('get') + ->withConsecutive( + ['zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED], + ['zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED], + ['zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED], + )->willReturnOnConsecutiveCalls( + ['cas' => 123, 'value' => serialize([new \stdClass()])], + ['cas' => 127, 'value' => time() - 900], + null + ); + + $memcachedClient->expects($this->exactly(1)) + ->method('compareAndSwap') + ->with('123', 'zipkin_traces_spans', serialize([])) + ->willReturn(true); + + $memcachedClient->expects($this->exactly(2)) + ->method('quit') + ->willReturn(true); + + $httpReporter->expects($this->exactly(1)) + ->method('report') + ->with([new \stdClass(), new \stdClass()]); + + $memcached->report([new \stdClass()]); + } + + public function testFlushingOfOneSpanWithRetry() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient); + + $memcachedClient->expects($this->once()) + ->method('ping') + ->willReturn(true); + + $memcachedClient->expects($this->exactly(2)) + ->method('get') + ->withConsecutive( + ['zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED], + ['zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED] + )->willReturnOnConsecutiveCalls( + ['cas' => 123, 'value' => serialize([new \stdClass()])], + ['cas' => 124, 'value' => serialize([new \stdClass()])] + ); + + $memcachedClient->expects($this->exactly(2)) + ->method('compareAndSwap') + ->withConsecutive( + ['123', 'zipkin_traces_spans', serialize([])], + ['124', 'zipkin_traces_spans', serialize([])] + )->willReturnOnConsecutiveCalls( + false, + true + ); + + $memcachedClient->expects($this->once()) + ->method('quit') + ->willReturn(true); + + $logger = $this->prophesize(LoggerInterface::class); + $logger->error()->shouldNotBeCalled(); + $this->assertEquals($memcached->flush(), [new \stdClass()]); + } + + public function testFlushingOfOneSpan() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->willReturn(true); + + $memcachedClient->expects($this->exactly(1)) + ->method('get') + ->with('zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED) + ->willReturn([ + 'cas' => 123, + 'value' => serialize([new \stdClass()]) + ]); + + $memcachedClient->expects($this->exactly(1)) + ->method('compareAndSwap') + ->with('123', 'zipkin_traces_spans', serialize([])) + ->willReturn(true); + + $memcachedClient->expects($this->exactly(1)) + ->method('quit') + ->willReturn(true); + + $logger = $this->prophesize(LoggerInterface::class); + $logger->error()->shouldNotBeCalled(); + $this->assertEquals($memcached->flush(), [new \stdClass()]); + } + + public function testFlushingOfZeroSpans() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->willReturn(true); + + $memcachedClient->expects($this->exactly(1)) + ->method('get') + ->with('zipkin_traces_spans', null, MemcachedClient::GET_EXTENDED) + ->willReturn(false); + + $memcachedClient->expects($this->exactly(1)) + ->method('quit') + ->willReturn(true); + + $logger = $this->prophesize(LoggerInterface::class); + $logger->error()->shouldNotBeCalled(); + $this->assertEquals($memcached->flush(), []); + } + + public function testFlushingError() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $logger = $this->createMock(LoggerInterface::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient, $logger); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->will($this->throwException(new Exception("Unable to connect"))); + + $logger->expects($this->exactly(1)) + ->method('error'); + + $this->assertEquals($memcached->flush(), []); + } + + public function testDisabledPatchInterval() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached(['batch_interval' => -1], $httpReporter, $memcachedClient); + + $this->assertEquals( + $this->invokeMethod($memcached, 'isBatchIntervalPassed', []), + false + ); + } + + public function testEnabledPatchInterval01() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached(['batch_interval' => 60], $httpReporter, $memcachedClient); + + $memcachedClient->expects($this->exactly(1)) + ->method('get') + ->with('zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED) + ->willReturn(null); + + $this->assertEquals( + $this->invokeMethod($memcached, 'isBatchIntervalPassed', []), + true + ); + } + + public function testEnabledPatchInterval02() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached(['batch_interval' => 60], $httpReporter, $memcachedClient); + + $memcachedClient->expects($this->exactly(1)) + ->method('get') + ->with('zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED) + ->willReturn([ + 'cas' => 123, + 'value' => time() - 90 + ]); + + $this->assertEquals( + $this->invokeMethod($memcached, 'isBatchIntervalPassed', []), + true + ); + } + + public function testEnabledPatchInterval03() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached(['batch_interval' => 60], $httpReporter, $memcachedClient); + + $memcachedClient->expects($this->exactly(1)) + ->method('get') + ->with('zipkin_traces_batch_ts', null, MemcachedClient::GET_EXTENDED) + ->willReturn([ + 'cas' => 123, + 'value' => time() - 40 + ]); + + $this->assertEquals( + $this->invokeMethod($memcached, 'isBatchIntervalPassed', []), + false + ); + } + + public function testResetBatchInterval() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $memcached = new Memcached(['batch_interval' => -1], $httpReporter, $memcachedClient); + + $this->assertEquals( + $this->invokeMethod($memcached, 'resetBatchInterval', []), + false + ); + } + + public function testResetBatchIntervalError() + { + $memcachedClient = $this->createMock(MemcachedClient::class); + $httpReporter = $this->createMock(Reporter::class); + $logger = $this->createMock(LoggerInterface::class); + $memcached = new Memcached([], $httpReporter, $memcachedClient, $logger); + + $memcachedClient->expects($this->exactly(1)) + ->method('ping') + ->will($this->throwException(new Exception("Unable to connect"))); + + $logger->expects($this->exactly(1)) + ->method('error'); + + $this->assertEquals( + $this->invokeMethod($memcached, 'resetBatchInterval', []), + true + ); + } +}