From 8bde2dc357bbbf8799295febbce30c15f1bfab4d Mon Sep 17 00:00:00 2001 From: sergkash7 <55360924+sergkash7@users.noreply.github.com> Date: Tue, 27 Jun 2023 11:17:24 +0300 Subject: [PATCH] Added use of amphp/pipeline instead of amphp/sync --- app/Commands/RunCommand.php | 5 +- app/Processor.php | 119 +++++++++++++++++++++--------------- app/Sample.php | 20 ++++++ app/Sender.php | 55 ++++++----------- composer.json | 4 +- 5 files changed, 112 insertions(+), 91 deletions(-) create mode 100644 app/Sample.php diff --git a/app/Commands/RunCommand.php b/app/Commands/RunCommand.php index db493bc..4673080 100644 --- a/app/Commands/RunCommand.php +++ b/app/Commands/RunCommand.php @@ -3,7 +3,6 @@ namespace Zoon\PyroSpy\Commands; use InvalidArgumentException; -use Revolt\EventLoop; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputDefinition; use Symfony\Component\Console\Input\InputInterface; @@ -141,12 +140,12 @@ protected function execute(InputInterface $input, OutputInterface $output): int $processor = new Processor( $interval, $batch, - new Sender($pyroscope, $app, $rateHz, $tags, $concurrentRequestLimit), + new Sender($pyroscope, $app, $rateHz, $tags), array_values(array_filter($plugins)), $sendSampleFutureLimit, + $concurrentRequestLimit, ); $processor->process(); - EventLoop::run(); return Command::SUCCESS; } diff --git a/app/Processor.php b/app/Processor.php index 9c79a68..b6d4b00 100644 --- a/app/Processor.php +++ b/app/Processor.php @@ -2,12 +2,13 @@ namespace Zoon\PyroSpy; -use Amp\Sync\LocalSemaphore; -use Amp\Sync\Semaphore; +use Amp\Future; +use Amp\Pipeline\Queue; use Generator; use InvalidArgumentException; use Throwable; use Zoon\PyroSpy\Plugins\PluginInterface; +use function Amp\async; use function Amp\ByteStream\getStdin; use function Amp\ByteStream\splitLines; @@ -19,7 +20,8 @@ final class Processor { * @var array> */ private array $results; - private readonly Semaphore $sendSampleFutureLimit; + /** @var Queue */ + private readonly Queue $queue; /** * @param list $plugins @@ -30,9 +32,10 @@ public function __construct( private readonly Sender $sender, private readonly array $plugins, int $sendSampleFutureLimit, + private readonly int $concurrentRequestLimit, ) { $this->init(); - $this->sendSampleFutureLimit = new LocalSemaphore($sendSampleFutureLimit); + $this->queue = new Queue($sendSampleFutureLimit); } private function init(): void { @@ -42,39 +45,78 @@ private function init(): void { } public function process(): void { - $sample = []; + Future\await([ + $this->runProducer(), + $this->runConsumer(), + ]); + } - foreach (self::getLine() as $line) { - $isEndOfTrace = $line === ''; + private function runProducer(): Future { + return async(function (): void { + $sample = []; - if (!$isEndOfTrace) { - $sample[] = $line; - } + foreach (self::getLine() as $line) { + $isEndOfTrace = $line === ''; - if ($isEndOfTrace && count($sample) > 0) { - try { - $tags = self::extractTags($sample); - $samplePrepared = self::prepareSample($sample); - self::checkSample($samplePrepared); - } catch (Throwable $e) { - echo $e->getMessage() . PHP_EOL; - var_dump($sample); - $sample = []; - continue; + if (!$isEndOfTrace) { + $sample[] = $line; } - foreach ($this->plugins as $plugin) { - [$tags, $samplePrepared] = $plugin->process($tags, $samplePrepared); + if ($isEndOfTrace && count($sample) > 0) { + try { + try { + $tags = self::extractTags($sample); + $samplePrepared = self::prepareSample($sample); + self::checkSample($samplePrepared); + } catch (Throwable $e) { + echo $e->getMessage() . PHP_EOL; + var_dump($sample); + continue; + } + + foreach ($this->plugins as $plugin) { + [$tags, $samplePrepared] = $plugin->process($tags, $samplePrepared); + } + $key = self::stringifyTrace($samplePrepared); + $this->groupTrace($tags, $key); + + $currentTime = time(); + + if ($currentTime < $this->tsEnd && $this->countResults() < $this->batchLimit) { + continue; + } + + foreach ($this->results as $tagSerialized => $results) { + $this->queue->push(new Sample($this->tsStart, $currentTime, $results, unserialize($tagSerialized))); + } + + $this->init(); + } finally { + $sample = []; + } } - $key = self::stringifyTrace($samplePrepared); - $this->groupTrace($tags, $key); - $this->sendResults(); + } - $sample = []; + $currentTime = time(); + foreach ($this->results as $tagSerialized => $results) { + $this->queue->push(new Sample($this->tsStart, $currentTime, $results, unserialize($tagSerialized))); } - } - $this->sendResults(true); + $this->queue->complete(); + }); + } + + private function runConsumer(): Future { + return async(function (): void { + $this + ->queue + ->pipe() + ->unordered() + ->concurrent($this->concurrentRequestLimit) + ->forEach(function (Sample $sample): void { + $this->sender->sendSample($sample); + }); + }); } /** @@ -174,27 +216,6 @@ private function groupTrace(array $tags, string $key): void { $this->results[$tagsKey][$key]++; } - private function sendResults(bool $force = false): void { - $currentTime = time(); - if (!$force && $currentTime < $this->tsEnd && $this->countResults() < $this->batchLimit) { - return; - } - - $tsStart = $this->tsStart; - foreach ($this->results as $tagSerialized => $results) { - $futureLock = $this->sendSampleFutureLimit->acquire(); - $this - ->sender - ->sendSample($tsStart, $currentTime, $results, unserialize($tagSerialized)) - ->finally(static function () use ($futureLock): void { - $futureLock->release(); - }) - ; - } - - $this->init(); - } - private function countResults(): int { $count = 0; foreach ($this->results as $tagResuts) { diff --git a/app/Sample.php b/app/Sample.php new file mode 100644 index 0000000..ebc3edb --- /dev/null +++ b/app/Sample.php @@ -0,0 +1,20 @@ + $samples + * @param array $tags + */ + public function __construct( + public readonly int $fromTs, + public readonly int $toTs, + public readonly array $samples, + public readonly array $tags, + ) { + } +} \ No newline at end of file diff --git a/app/Sender.php b/app/Sender.php index 5731137..b86699d 100644 --- a/app/Sender.php +++ b/app/Sender.php @@ -2,18 +2,13 @@ namespace Zoon\PyroSpy; -use Amp\Future; use Amp\Http\Client\HttpClient; use Amp\Http\Client\HttpClientBuilder; use Amp\Http\Client\Request; -use Amp\Sync\LocalSemaphore; -use function Amp\async; -use function Amp\delay; final class Sender { private readonly HttpClient $client; - private readonly LocalSemaphore $concurrentRequestLimit; /** * @param array $tags @@ -23,47 +18,33 @@ public function __construct( private readonly string $appName, private readonly int $rateHz, private readonly array $tags, - int $concurrentRequestLimit, ) { $this->client = (new HttpClientBuilder()) ->retry(0) ->followRedirects(0) ->build() ; - $this->concurrentRequestLimit = new LocalSemaphore($concurrentRequestLimit); } - /** - * @param array $samples - * @param array $tags - * @return Future - */ - public function sendSample(int $fromTs, int $toTs, array $samples, array $tags): Future { - return async(function () use ($fromTs, $toTs, $samples, $tags) { - $lock = $this->concurrentRequestLimit->acquire(); - try { - $url = $this->getUrl($tags, $fromTs, $toTs); - try { - $request = new Request($url, 'POST', self::prepareBody($samples)); - $request->setTcpConnectTimeout(5 * 60); - $request->setTlsHandshakeTimeout(5 * 60); - $request->setTransferTimeout(60 * 60); - $request->setInactivityTimeout(60 * 60); - $response = $this->client->request($request); - if ($response->getStatus() === 200) { - return true; - } else { - printf("\nerror on request to url '%s', status code: %s", $url, $response->getStatus()); - return false; - } - } catch (\Throwable $exception) { - printf("\nerror on request to url '%s', exception message: %s", $url, $exception->getMessage()); - return false; - } - } finally { - $lock->release(); + public function sendSample(Sample $sample): bool { + $url = $this->getUrl($sample->tags, $sample->fromTs, $sample->toTs); + try { + $request = new Request($url, 'POST', self::prepareBody($sample->samples)); + $request->setTcpConnectTimeout(5 * 60); + $request->setTlsHandshakeTimeout(5 * 60); + $request->setTransferTimeout(60 * 60); + $request->setInactivityTimeout(60 * 60); + $response = $this->client->request($request); + if ($response->getStatus() === 200) { + return true; + } else { + printf("\nerror on request to url '%s', status code: %s", $url, $response->getStatus()); + return false; } - }); + } catch (\Throwable $exception) { + printf("\nerror on request to url '%s', exception message: %s", $url, $exception->getMessage()); + return false; + } } /** diff --git a/composer.json b/composer.json index fb8662b..88eaf20 100644 --- a/composer.json +++ b/composer.json @@ -18,9 +18,9 @@ "amphp/amp": "^3.0", "revolt/event-loop": "^1.0", "amphp/http-client": "^5", - "amphp/sync": "^2.0", "amphp/byte-stream": "^2.0", - "symfony/console": "^5|^6" + "symfony/console": "^5|^6", + "amphp/pipeline": "^1.0" }, "autoload": { "psr-4": {