From 8544a912fb59e49a0cbb123f46f0b01582fbcecb Mon Sep 17 00:00:00 2001 From: Renan Date: Wed, 12 Feb 2020 14:40:36 +0100 Subject: [PATCH] Improvements on the Reindex API (#1752) Reindex improvements: - Add pipeline support - Add `Reindex::PIPELINE` - Add `$reindex->setPipeline(Pipeline $pipeline)` - Improve refresh support - Add `$reindex->setRefresh(bool|string $value)` - Improve query support - Add `$reindex->setQuery(AbstractQuery $query)` - Add slices support - Add `Reindex::SLICES` - Add `Reindex::SLICES_AUTO` ... and also: - Add `$pipeline->getId()` --- CHANGELOG.md | 6 ++++ lib/Elastica/Pipeline.php | 5 ++++ lib/Elastica/Reindex.php | 33 +++++++++++++++++++++ test/Elastica/ReindexTest.php | 55 +++++++++++++++++++++++++++++++++++ 4 files changed, 99 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21b87fee43..8727610d5c 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased](https://github.com/ruflin/Elastica/compare/7.0.0-beta.3...master) ### Backward Compatibility Breaks ### Added +* Added `Elastica\Reindex->setPipeline(Elastica\Pipeline $pipeline): void`. The link between the reindex and the pipeline is solved when `run()` is called, and thus the pipeline given doesn't need to be created before calling `setPipeline()` [#1752](https://github.com/ruflin/Elastica/pull/1752) +* Added `Elastica\Reindex->setRefresh(bool|string $value): void`. It accepts boolean and `REFRESH_*` constants from its class [#1752](https://github.com/ruflin/Elastica/pull/1752) +* Added `Elastica\Reindex->setQuery(Elastica\Query\AbstractQuery $query): void` [#1752](https://github.com/ruflin/Elastica/pull/1752) +* Added constants `PIPELINE`, `REFRESH_TRUE`, `REFRESH_FALSE`, `REFRESH_WAIT_FOR`, `SLICES` and `SLICES_AUTO` to `Elastica\Reindex` [#1752](https://github.com/ruflin/Elastica/pull/1752) +* Added `Elastica\Pipeline->getId(): ?string` [#1752](https://github.com/ruflin/Elastica/pull/1752) + ### Changed - Require elastica-php library >= v7.1.1, fixes an issue on Ingestion/Put() type-hinting - Require guzzle >= v6.3 as development library: fixes issues on PHP >= 7.2 diff --git a/lib/Elastica/Pipeline.php b/lib/Elastica/Pipeline.php index 3175f2878c..71c6c6e07a 100644 --- a/lib/Elastica/Pipeline.php +++ b/lib/Elastica/Pipeline.php @@ -123,6 +123,11 @@ public function setId(string $id): self return $this; } + public function getId(): ?string + { + return $this->id; + } + /** * @param AbstractProcessor[] */ diff --git a/lib/Elastica/Reindex.php b/lib/Elastica/Reindex.php index a35d73f1b0..17d5e7bc09 100644 --- a/lib/Elastica/Reindex.php +++ b/lib/Elastica/Reindex.php @@ -23,12 +23,18 @@ class Reindex extends Param public const REMOTE = 'remote'; public const SLICE = 'slice'; public const REFRESH = 'refresh'; + public const REFRESH_TRUE = 'true'; + public const REFRESH_FALSE = 'false'; + public const REFRESH_WAIT_FOR = 'wait_for'; public const WAIT_FOR_COMPLETION = 'wait_for_completion'; public const WAIT_FOR_COMPLETION_FALSE = 'false'; public const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards'; public const TIMEOUT = 'timeout'; public const SCROLL = 'scroll'; public const REQUESTS_PER_SECOND = 'requests_per_second'; + public const PIPELINE = 'pipeline'; + public const SLICES = 'slices'; + public const SLICES_AUTO = 'auto'; /** * @var Index @@ -96,6 +102,12 @@ protected function _getDestPartBody(Index $index, array $params): array 'index' => $index->getName(), ], $this->_resolveDestOptions($params)); + // Resolves the pipeline name + $pipeline = $destBody[self::PIPELINE] ?? null; + if ($pipeline instanceof Pipeline) { + $destBody[self::PIPELINE] = $pipeline->getId(); + } + return $destBody; } @@ -115,6 +127,7 @@ private function _resolveDestOptions(array $params): array return \array_intersect_key($params, [ self::VERSION_TYPE => null, self::OPERATION_TYPE => null, + self::PIPELINE => null, ]); } @@ -184,6 +197,26 @@ public function setScript(Script $script) $this->setParam(self::SCRIPT, $script); } + public function setQuery(AbstractQuery $query): void + { + $this->setParam(self::QUERY, $query); + } + + public function setPipeline(Pipeline $pipeline): void + { + $this->setParam(self::PIPELINE, $pipeline); + } + + /** + * @param bool|string $value + */ + public function setRefresh($value): void + { + \is_bool($value) && $value = $value ? self::REFRESH_TRUE : self::REFRESH_FALSE; + + $this->setParam(self::REFRESH, $value); + } + public function getTaskId() { $taskId = null; diff --git a/test/Elastica/ReindexTest.php b/test/Elastica/ReindexTest.php index 7a4f558afc..69a859f8e2 100644 --- a/test/Elastica/ReindexTest.php +++ b/test/Elastica/ReindexTest.php @@ -5,6 +5,9 @@ use Elastica\Document; use Elastica\Exception\ResponseException; use Elastica\Index; +use Elastica\Pipeline; +use Elastica\Processor\Rename; +use Elastica\Processor\Uppercase; use Elastica\Query\Match; use Elastica\Reindex; use Elastica\Script\Script; @@ -196,6 +199,58 @@ public function testReindexWithRemote() } } + /** + * @group functional + */ + public function testReindexWithPipeline(): void + { + $oldIndex = $this->_createIndex('idx1', true, 2); + $this->_addDocs($oldIndex, 10); + + $newIndex = $this->_createIndex('idx2', true, 2); + + $pipeline = new Pipeline($newIndex->getClient()); + $pipeline->setId('my-pipeline'); + $pipeline->setDescription('For testing purposes"'); + $pipeline->addProcessor(new Rename('id', 'identifier')); + $pipeline->addProcessor(new Uppercase('key')); + + $reindex = new Reindex($oldIndex, $newIndex); + $reindex->setPipeline($pipeline); + + $pipeline->create(); + $reindex->run(); + $newIndex->refresh(); + + $results = $newIndex->search()->getResults(); + $this->assertEquals(10, $newIndex->count()); + + foreach ($results as $result) { + $this->assertArrayNotHasKey('id', $result->getData()); + $this->assertArrayHasKey('identifier', $result->getData()); + $this->assertSame('VALUE', $result->getData()['key']); + } + } + + /** + * @group functional + */ + public function testReindexWithRefresh(): void + { + $oldIndex = $this->_createIndex('idx1', true, 2); + $this->_addDocs($oldIndex, 10); + + $newIndex = $this->_createIndex('idx2', true, 2); + + $reindex = new Reindex($oldIndex, $newIndex); + $reindex->setRefresh(true); + + $reindex->run(); + + $newIndex->search()->getResults(); + $this->assertEquals(10, $newIndex->count()); + } + private function _addDocs(Index $index, int $docs): array { $insert = [];