From 6abbf4066f9e0f7bf87b8b0f9bbfcb83021ba5ae Mon Sep 17 00:00:00 2001 From: Verdan Mahmood Date: Mon, 26 Jul 2021 16:38:00 +0200 Subject: [PATCH 1/3] Upgrades ES to 7.13.3 Signed-off-by: verdan --- .../publisher/elasticsearch_publisher.py | 3 ++- databuilder/requirements.txt | 2 +- docker-amundsen-local.yml | 4 +++- docker-amundsen.yml | 4 +++- search/requirements.txt | 4 ++-- search/search_service/proxy/elasticsearch.py | 13 ++++++++++++- 6 files changed, 23 insertions(+), 7 deletions(-) diff --git a/databuilder/databuilder/publisher/elasticsearch_publisher.py b/databuilder/databuilder/publisher/elasticsearch_publisher.py index b024844bc1..4eb7cd3045 100644 --- a/databuilder/databuilder/publisher/elasticsearch_publisher.py +++ b/databuilder/databuilder/publisher/elasticsearch_publisher.py @@ -90,7 +90,8 @@ def publish_impl(self) -> None: cnt = 0 # create new index with mapping - self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping) + self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping, + params={'include_type_name': 'true'}) for action in actions: index_row = dict(index=dict(_index=self.elasticsearch_new_index, _type=self.elasticsearch_type)) diff --git a/databuilder/requirements.txt b/databuilder/requirements.txt index e885aaa191..f652b1c4f1 100644 --- a/databuilder/requirements.txt +++ b/databuilder/requirements.txt @@ -1,7 +1,7 @@ # Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 -elasticsearch>=6.2.0,<7.0 +elasticsearch>=6.2.0,<7.13.4 neo4j-driver>=1.7.2,<2.0 requests>=2.25.0,<3.0 diff --git a/docker-amundsen-local.yml b/docker-amundsen-local.yml index fdda8cc9ee..8129d0ed58 100644 --- a/docker-amundsen-local.yml +++ b/docker-amundsen-local.yml @@ -18,7 +18,7 @@ services: networks: - amundsennet elasticsearch: - image: elasticsearch:6.7.0 + image: elasticsearch:7.13.3 container_name: es_amundsen ports: - 9200:9200 @@ -28,6 +28,8 @@ services: nofile: soft: 65536 hard: 65536 + environment: + - discovery.type=single-node amundsensearch: build: context: . diff --git a/docker-amundsen.yml b/docker-amundsen.yml index 3dd7365cc7..6fa73711f7 100644 --- a/docker-amundsen.yml +++ b/docker-amundsen.yml @@ -20,7 +20,7 @@ services: networks: - amundsennet elasticsearch: - image: elasticsearch:6.7.0 + image: elasticsearch:7.13.3 container_name: es_amundsen ports: - 9200:9200 @@ -32,6 +32,8 @@ services: nofile: soft: 65536 hard: 65536 + environment: + - discovery.type=single-node amundsensearch: image: amundsendev/amundsen-search:2.4.1 container_name: amundsensearch diff --git a/search/requirements.txt b/search/requirements.txt index e67d257a13..9078ef2158 100644 --- a/search/requirements.txt +++ b/search/requirements.txt @@ -2,5 +2,5 @@ # SPDX-License-Identifier: Apache-2.0 pyatlasclient==1.0.3 -elasticsearch==6.8.2 -elasticsearch-dsl==6.4.0 +elasticsearch==7.13.3 +elasticsearch-dsl==7.4.0 diff --git a/search/search_service/proxy/elasticsearch.py b/search/search_service/proxy/elasticsearch.py index 5233d8ccb2..bc9a3cb8af 100644 --- a/search/search_service/proxy/elasticsearch.py +++ b/search/search_service/proxy/elasticsearch.py @@ -13,6 +13,7 @@ from elasticsearch import Elasticsearch from elasticsearch.exceptions import NotFoundError from elasticsearch_dsl import Search, query +from elasticsearch_dsl.utils import AttrDict from flask import current_app from search_service import config @@ -284,7 +285,14 @@ def _get_search_result(self, page_index: int, except Exception: LOGGING.exception('The record doesnt contain specified field.') - return search_result_model(total_results=response.hits.total, + # This is to support ESv7.x, and newer version of elasticsearch_dsl + if isinstance(response.hits.total, AttrDict): + _total = response.hits.total.value + _total = response.hits.total.get("value") + else: + _total = response.hits.total + + return search_result_model(total_results=_total, results=results) def _get_instance(self, attr: str, val: Any) -> Any: @@ -311,6 +319,9 @@ def _search_helper(self, page_index: int, :param query_name: name of query to query the ES :return: """ + # This is to support ESv7.x + # ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/breaking-changes-7.0.html#track-total-hits-10000-default # noqa: E501 + client = client.extra(track_total_hits=True) if query_name: q = query.Q(query_name) From 39de198b5377879bf0861aded43086c394731352 Mon Sep 17 00:00:00 2001 From: verdan Date: Mon, 26 Jul 2021 16:41:33 +0200 Subject: [PATCH 2/3] typo Signed-off-by: verdan --- search/search_service/proxy/elasticsearch.py | 1 - 1 file changed, 1 deletion(-) diff --git a/search/search_service/proxy/elasticsearch.py b/search/search_service/proxy/elasticsearch.py index bc9a3cb8af..969dfbbe4c 100644 --- a/search/search_service/proxy/elasticsearch.py +++ b/search/search_service/proxy/elasticsearch.py @@ -288,7 +288,6 @@ def _get_search_result(self, page_index: int, # This is to support ESv7.x, and newer version of elasticsearch_dsl if isinstance(response.hits.total, AttrDict): _total = response.hits.total.value - _total = response.hits.total.get("value") else: _total = response.hits.total From 7ce5a9c4ecef30014f2d21a909ff28bb81f6ca20 Mon Sep 17 00:00:00 2001 From: verdan Date: Tue, 27 Jul 2021 13:50:07 +0200 Subject: [PATCH 3/3] Fixes the test cases, and code review Signed-off-by: verdan --- databuilder/requirements.txt | 2 +- .../publisher/test_elasticsearch_publisher.py | 6 ++- search/search_service/config.py | 3 +- search/search_service/proxy/elasticsearch.py | 6 +-- search/tests/unit/proxy/test_elasticsearch.py | 42 +++++++++++++------ 5 files changed, 39 insertions(+), 20 deletions(-) diff --git a/databuilder/requirements.txt b/databuilder/requirements.txt index f652b1c4f1..8439187f59 100644 --- a/databuilder/requirements.txt +++ b/databuilder/requirements.txt @@ -1,7 +1,7 @@ # Copyright Contributors to the Amundsen project. # SPDX-License-Identifier: Apache-2.0 -elasticsearch>=6.2.0,<7.13.4 +elasticsearch>=6.2.0,<8.0 neo4j-driver>=1.7.2,<2.0 requests>=2.25.0,<3.0 diff --git a/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py b/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py index 22b00b3c7f..bdcc23e63f 100644 --- a/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py +++ b/databuilder/tests/unit/publisher/test_elasticsearch_publisher.py @@ -69,7 +69,8 @@ def test_publish_with_data_and_no_old_index(self) -> None: # ensure indices create endpoint was called default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index, - body=default_mapping) + body=default_mapping, + params={'include_type_name': 'true'}) # bulk endpoint called once self.mock_es_client.bulk.assert_called_once_with( @@ -102,7 +103,8 @@ def test_publish_with_data_and_old_index(self) -> None: # ensure indices create endpoint was called default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index, - body=default_mapping) + body=default_mapping, + params={'include_type_name': 'true'}) # bulk endpoint called once self.mock_es_client.bulk.assert_called_once_with( diff --git a/search/search_service/config.py b/search/search_service/config.py index 78244d902f..78e880307f 100644 --- a/search/search_service/config.py +++ b/search/search_service/config.py @@ -2,6 +2,7 @@ # SPDX-License-Identifier: Apache-2.0 import os +from typing import Any, Optional ELASTICSEARCH_INDEX_KEY = 'ELASTICSEARCH_INDEX' SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE' @@ -47,7 +48,7 @@ class LocalConfig(Config): PORT=PROXY_PORT) ) PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')] - PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') + PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') # type: Optional[Any] PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic') PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic') diff --git a/search/search_service/proxy/elasticsearch.py b/search/search_service/proxy/elasticsearch.py index 969dfbbe4c..ece3971d48 100644 --- a/search/search_service/proxy/elasticsearch.py +++ b/search/search_service/proxy/elasticsearch.py @@ -726,7 +726,7 @@ def _build_delete_actions(self, data: List[str], index_key: str, type: str) -> L return [{'delete': {'_index': index_key, '_id': id, '_type': type}} for id in data] def _bulk_helper(self, actions: List[Dict[str, Any]]) -> None: - result = self.elasticsearch.bulk(actions) + result = self.elasticsearch.bulk(body=actions) if result['errors']: # ES's error messages are nested within elasticsearch objects and can @@ -742,7 +742,7 @@ def _fetch_old_index(self, alias: str) -> List[str]: :return: list of elasticsearch indices """ try: - indices = self.elasticsearch.indices.get_alias(alias).keys() + indices = self.elasticsearch.indices.get_alias(index=alias).keys() return indices except NotFoundError: LOGGING.warn('Received index not found error from Elasticsearch', exc_info=True) @@ -767,5 +767,5 @@ def _get_mapping(alias: str) -> str: # alias our new index index_actions = {'actions': [{'add': {'index': index_key, 'alias': alias}}]} - self.elasticsearch.indices.update_aliases(index_actions) + self.elasticsearch.indices.update_aliases(body=index_actions) return index_key diff --git a/search/tests/unit/proxy/test_elasticsearch.py b/search/tests/unit/proxy/test_elasticsearch.py index 4e3c8b5fc7..6d47a36443 100644 --- a/search/tests/unit/proxy/test_elasticsearch.py +++ b/search/tests/unit/proxy/test_elasticsearch.py @@ -5,6 +5,7 @@ from typing import ( # noqa: F401 Any, Iterable, List, ) +from unittest import mock from unittest.mock import MagicMock, patch from elasticsearch_dsl import Search @@ -163,8 +164,9 @@ def test_setup_client(self) -> None: ) a = self.es_proxy.elasticsearch for client in [a, a.cat, a.cluster, a.indices, a.ingest, a.nodes, a.snapshot, a.tasks]: - self.assertEqual(client.transport.hosts[0]['host'], "0.0.0.0") - self.assertEqual(client.transport.hosts[0]['port'], 9200) + _host = client.transport.hosts[0] # type: ignore + self.assertEqual(_host['host'], "0.0.0.0") + self.assertEqual(_host['port'], 9200) @patch('search_service.proxy.elasticsearch.Elasticsearch', autospec=True) def test_setup_client_with_username_and_password(self, elasticsearch_mock: MagicMock) -> None: @@ -509,7 +511,6 @@ def test_create_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) start_data = [ Table(id='snowflake://blue.test_schema/bank_accounts', cluster='blue', column_names=['1', '2'], database='snowflake', schema='test_schema', description='A table for something', @@ -574,12 +575,17 @@ def test_create_document(self, mock_uuid: MagicMock) -> None: 'programmatic_descriptions': ["test"] } ] - mock_elasticsearch.bulk.return_value = {'errors': False} + + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) + + _bulk = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _bulk.return_value = {'errors': False} expected_alias = 'table_search_index' result = self.es_proxy.create_document(data=start_data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk.assert_called_with(body=expected_data) def test_update_document_with_no_data(self) -> None: expected = '' @@ -590,7 +596,8 @@ def test_update_document_with_no_data(self) -> None: def test_update_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) mock_uuid.return_value = new_index_name table_key = 'snowflake://blue.test_schema/bitcoin_wallets' expected_alias = 'table_search_index' @@ -632,14 +639,18 @@ def test_update_document(self, mock_uuid: MagicMock) -> None: ] result = self.es_proxy.update_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) @patch('uuid.uuid4') def test_delete_table_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) + expected_alias = 'table_search_index' data = ['id1', 'id2'] @@ -650,14 +661,16 @@ def test_delete_table_document(self, mock_uuid: MagicMock) -> None: result = self.es_proxy.delete_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) @patch('uuid.uuid4') def test_delete_user_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'tester_index_name' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) expected_alias = 'user_search_index' data = ['id1', 'id2'] @@ -668,14 +681,16 @@ def test_delete_user_document(self, mock_uuid: MagicMock) -> None: result = self.es_proxy.delete_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) @patch('uuid.uuid4') def test_delete_feature_document(self, mock_uuid: MagicMock) -> None: mock_elasticsearch = self.es_proxy.elasticsearch new_index_name = 'test_indx' mock_uuid.return_value = new_index_name - mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})]) + _get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias) + _get_alias.return_value = dict([(new_index_name, {})]) expected_alias = 'feature_search_index' data = ['id1', 'id2'] @@ -686,7 +701,8 @@ def test_delete_feature_document(self, mock_uuid: MagicMock) -> None: result = self.es_proxy.delete_document(data=data, index=expected_alias) self.assertEqual(expected_alias, result) - mock_elasticsearch.bulk.assert_called_with(expected_data) + _bulk = mock.create_autospec(mock_elasticsearch.bulk) + _bulk.assert_called_with(body=expected_data) def test_get_instance_string(self) -> None: result = self.es_proxy._get_instance('column', 'value')