Skip to content

Commit

Permalink
Improvements on the Reindex API (#1752)
Browse files Browse the repository at this point in the history
Reindex improvements:

- Add pipeline support
    - Add `Reindex::PIPELINE`
    - Add `$reindex->setPipeline(Pipeline $pipeline)`
- Improve refresh support
    - Add `$reindex->setRefresh(bool|string $value)`
- Improve query support
    - Add `$reindex->setQuery(AbstractQuery $query)`
- Add slices support
    - Add `Reindex::SLICES`
    - Add `Reindex::SLICES_AUTO`

... and also:

- Add `$pipeline->getId()`
  • Loading branch information
renanbr authored Feb 12, 2020
1 parent 5e8cea1 commit 8544a91
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 0 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased](https://github.com/ruflin/Elastica/compare/7.0.0-beta.3...master)
### Backward Compatibility Breaks
### Added
* Added `Elastica\Reindex->setPipeline(Elastica\Pipeline $pipeline): void`. The link between the reindex and the pipeline is solved when `run()` is called, and thus the pipeline given doesn't need to be created before calling `setPipeline()` [#1752](https://github.com/ruflin/Elastica/pull/1752)
* Added `Elastica\Reindex->setRefresh(bool|string $value): void`. It accepts boolean and `REFRESH_*` constants from its class [#1752](https://github.com/ruflin/Elastica/pull/1752)
* Added `Elastica\Reindex->setQuery(Elastica\Query\AbstractQuery $query): void` [#1752](https://github.com/ruflin/Elastica/pull/1752)
* Added constants `PIPELINE`, `REFRESH_TRUE`, `REFRESH_FALSE`, `REFRESH_WAIT_FOR`, `SLICES` and `SLICES_AUTO` to `Elastica\Reindex` [#1752](https://github.com/ruflin/Elastica/pull/1752)
* Added `Elastica\Pipeline->getId(): ?string` [#1752](https://github.com/ruflin/Elastica/pull/1752)

### Changed
- Require elastica-php library >= v7.1.1, fixes an issue on Ingestion/Put() type-hinting
- Require guzzle >= v6.3 as development library: fixes issues on PHP >= 7.2
Expand Down
5 changes: 5 additions & 0 deletions lib/Elastica/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ public function setId(string $id): self
return $this;
}

public function getId(): ?string
{
return $this->id;
}

/**
* @param AbstractProcessor[]
*/
Expand Down
33 changes: 33 additions & 0 deletions lib/Elastica/Reindex.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@ class Reindex extends Param
public const REMOTE = 'remote';
public const SLICE = 'slice';
public const REFRESH = 'refresh';
public const REFRESH_TRUE = 'true';
public const REFRESH_FALSE = 'false';
public const REFRESH_WAIT_FOR = 'wait_for';
public const WAIT_FOR_COMPLETION = 'wait_for_completion';
public const WAIT_FOR_COMPLETION_FALSE = 'false';
public const WAIT_FOR_ACTIVE_SHARDS = 'wait_for_active_shards';
public const TIMEOUT = 'timeout';
public const SCROLL = 'scroll';
public const REQUESTS_PER_SECOND = 'requests_per_second';
public const PIPELINE = 'pipeline';
public const SLICES = 'slices';
public const SLICES_AUTO = 'auto';

/**
* @var Index
Expand Down Expand Up @@ -96,6 +102,12 @@ protected function _getDestPartBody(Index $index, array $params): array
'index' => $index->getName(),
], $this->_resolveDestOptions($params));

// Resolves the pipeline name
$pipeline = $destBody[self::PIPELINE] ?? null;
if ($pipeline instanceof Pipeline) {
$destBody[self::PIPELINE] = $pipeline->getId();
}

return $destBody;
}

Expand All @@ -115,6 +127,7 @@ private function _resolveDestOptions(array $params): array
return \array_intersect_key($params, [
self::VERSION_TYPE => null,
self::OPERATION_TYPE => null,
self::PIPELINE => null,
]);
}

Expand Down Expand Up @@ -184,6 +197,26 @@ public function setScript(Script $script)
$this->setParam(self::SCRIPT, $script);
}

public function setQuery(AbstractQuery $query): void
{
$this->setParam(self::QUERY, $query);
}

public function setPipeline(Pipeline $pipeline): void
{
$this->setParam(self::PIPELINE, $pipeline);
}

/**
* @param bool|string $value
*/
public function setRefresh($value): void
{
\is_bool($value) && $value = $value ? self::REFRESH_TRUE : self::REFRESH_FALSE;

$this->setParam(self::REFRESH, $value);
}

public function getTaskId()
{
$taskId = null;
Expand Down
55 changes: 55 additions & 0 deletions test/Elastica/ReindexTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@
use Elastica\Document;
use Elastica\Exception\ResponseException;
use Elastica\Index;
use Elastica\Pipeline;
use Elastica\Processor\Rename;
use Elastica\Processor\Uppercase;
use Elastica\Query\Match;
use Elastica\Reindex;
use Elastica\Script\Script;
Expand Down Expand Up @@ -196,6 +199,58 @@ public function testReindexWithRemote()
}
}

/**
* @group functional
*/
public function testReindexWithPipeline(): void
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$this->_addDocs($oldIndex, 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$pipeline = new Pipeline($newIndex->getClient());
$pipeline->setId('my-pipeline');
$pipeline->setDescription('For testing purposes"');
$pipeline->addProcessor(new Rename('id', 'identifier'));
$pipeline->addProcessor(new Uppercase('key'));

$reindex = new Reindex($oldIndex, $newIndex);
$reindex->setPipeline($pipeline);

$pipeline->create();
$reindex->run();
$newIndex->refresh();

$results = $newIndex->search()->getResults();
$this->assertEquals(10, $newIndex->count());

foreach ($results as $result) {
$this->assertArrayNotHasKey('id', $result->getData());
$this->assertArrayHasKey('identifier', $result->getData());
$this->assertSame('VALUE', $result->getData()['key']);
}
}

/**
* @group functional
*/
public function testReindexWithRefresh(): void
{
$oldIndex = $this->_createIndex('idx1', true, 2);
$this->_addDocs($oldIndex, 10);

$newIndex = $this->_createIndex('idx2', true, 2);

$reindex = new Reindex($oldIndex, $newIndex);
$reindex->setRefresh(true);

$reindex->run();

$newIndex->search()->getResults();
$this->assertEquals(10, $newIndex->count());
}

private function _addDocs(Index $index, int $docs): array
{
$insert = [];
Expand Down

0 comments on commit 8544a91

Please sign in to comment.