Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Upgrades ES to 7.13.3 #1386

Merged
merged 3 commits into from
Jul 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ def publish_impl(self) -> None:
cnt = 0

# create new index with mapping
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping)
self.elasticsearch_client.indices.create(index=self.elasticsearch_new_index, body=self.elasticsearch_mapping,
params={'include_type_name': 'true'})
for action in actions:
index_row = dict(index=dict(_index=self.elasticsearch_new_index,
_type=self.elasticsearch_type))
Expand Down
2 changes: 1 addition & 1 deletion databuilder/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0

elasticsearch>=6.2.0,<7.0
elasticsearch>=6.2.0,<8.0
neo4j-driver>=1.7.2,<2.0
requests>=2.25.0,<3.0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def test_publish_with_data_and_no_old_index(self) -> None:
# ensure indices create endpoint was called
default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING
self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index,
body=default_mapping)
body=default_mapping,
params={'include_type_name': 'true'})

# bulk endpoint called once
self.mock_es_client.bulk.assert_called_once_with(
Expand Down Expand Up @@ -102,7 +103,8 @@ def test_publish_with_data_and_old_index(self) -> None:
# ensure indices create endpoint was called
default_mapping = ElasticsearchPublisher.DEFAULT_ELASTICSEARCH_INDEX_MAPPING
self.mock_es_client.indices.create.assert_called_once_with(index=self.test_es_new_index,
body=default_mapping)
body=default_mapping,
params={'include_type_name': 'true'})

# bulk endpoint called once
self.mock_es_client.bulk.assert_called_once_with(
Expand Down
4 changes: 3 additions & 1 deletion docker-amundsen-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ services:
networks:
- amundsennet
elasticsearch:
image: elasticsearch:6.7.0
image: elasticsearch:7.13.3
container_name: es_amundsen
ports:
- 9200:9200
Expand All @@ -28,6 +28,8 @@ services:
nofile:
soft: 65536
hard: 65536
environment:
- discovery.type=single-node
amundsensearch:
build:
context: .
Expand Down
4 changes: 3 additions & 1 deletion docker-amundsen.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ services:
networks:
- amundsennet
elasticsearch:
image: elasticsearch:6.7.0
image: elasticsearch:7.13.3
container_name: es_amundsen
ports:
- 9200:9200
Expand All @@ -32,6 +32,8 @@ services:
nofile:
soft: 65536
hard: 65536
environment:
- discovery.type=single-node
amundsensearch:
image: amundsendev/amundsen-search:2.4.1
container_name: amundsensearch
Expand Down
4 changes: 2 additions & 2 deletions search/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@
# SPDX-License-Identifier: Apache-2.0

pyatlasclient==1.0.3
elasticsearch==6.8.2
elasticsearch-dsl==6.4.0
elasticsearch==7.13.3
elasticsearch-dsl==7.4.0
3 changes: 2 additions & 1 deletion search/search_service/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# SPDX-License-Identifier: Apache-2.0

import os
from typing import Any, Optional

ELASTICSEARCH_INDEX_KEY = 'ELASTICSEARCH_INDEX'
SEARCH_PAGE_SIZE_KEY = 'SEARCH_PAGE_SIZE'
Expand Down Expand Up @@ -47,7 +48,7 @@ class LocalConfig(Config):
PORT=PROXY_PORT)
)
PROXY_CLIENT = PROXY_CLIENTS[os.environ.get('PROXY_CLIENT', 'ELASTICSEARCH')]
PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY')
PROXY_CLIENT_KEY = os.environ.get('PROXY_CLIENT_KEY') # type: Optional[Any]
PROXY_USER = os.environ.get('CREDENTIALS_PROXY_USER', 'elastic')
PROXY_PASSWORD = os.environ.get('CREDENTIALS_PROXY_PASSWORD', 'elastic')

Expand Down
18 changes: 14 additions & 4 deletions search/search_service/proxy/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import NotFoundError
from elasticsearch_dsl import Search, query
from elasticsearch_dsl.utils import AttrDict
from flask import current_app

from search_service import config
Expand Down Expand Up @@ -284,7 +285,13 @@ def _get_search_result(self, page_index: int,
except Exception:
LOGGING.exception('The record doesnt contain specified field.')

return search_result_model(total_results=response.hits.total,
# This is to support ESv7.x, and newer version of elasticsearch_dsl
if isinstance(response.hits.total, AttrDict):
_total = response.hits.total.value
else:
_total = response.hits.total

return search_result_model(total_results=_total,
results=results)

def _get_instance(self, attr: str, val: Any) -> Any:
Expand All @@ -311,6 +318,9 @@ def _search_helper(self, page_index: int,
:param query_name: name of query to query the ES
:return:
"""
# This is to support ESv7.x
# ref: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/breaking-changes-7.0.html#track-total-hits-10000-default # noqa: E501
client = client.extra(track_total_hits=True)
verdan marked this conversation as resolved.
Show resolved Hide resolved

if query_name:
q = query.Q(query_name)
Expand Down Expand Up @@ -716,7 +726,7 @@ def _build_delete_actions(self, data: List[str], index_key: str, type: str) -> L
return [{'delete': {'_index': index_key, '_id': id, '_type': type}} for id in data]

def _bulk_helper(self, actions: List[Dict[str, Any]]) -> None:
result = self.elasticsearch.bulk(actions)
result = self.elasticsearch.bulk(body=actions)

if result['errors']:
# ES's error messages are nested within elasticsearch objects and can
Expand All @@ -732,7 +742,7 @@ def _fetch_old_index(self, alias: str) -> List[str]:
:return: list of elasticsearch indices
"""
try:
indices = self.elasticsearch.indices.get_alias(alias).keys()
indices = self.elasticsearch.indices.get_alias(index=alias).keys()
return indices
except NotFoundError:
LOGGING.warn('Received index not found error from Elasticsearch', exc_info=True)
Expand All @@ -757,5 +767,5 @@ def _get_mapping(alias: str) -> str:

# alias our new index
index_actions = {'actions': [{'add': {'index': index_key, 'alias': alias}}]}
self.elasticsearch.indices.update_aliases(index_actions)
self.elasticsearch.indices.update_aliases(body=index_actions)
return index_key
42 changes: 29 additions & 13 deletions search/tests/unit/proxy/test_elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from typing import ( # noqa: F401
Any, Iterable, List,
)
from unittest import mock
from unittest.mock import MagicMock, patch

from elasticsearch_dsl import Search
Expand Down Expand Up @@ -163,8 +164,9 @@ def test_setup_client(self) -> None:
)
a = self.es_proxy.elasticsearch
for client in [a, a.cat, a.cluster, a.indices, a.ingest, a.nodes, a.snapshot, a.tasks]:
self.assertEqual(client.transport.hosts[0]['host'], "0.0.0.0")
self.assertEqual(client.transport.hosts[0]['port'], 9200)
_host = client.transport.hosts[0] # type: ignore
self.assertEqual(_host['host'], "0.0.0.0")
self.assertEqual(_host['port'], 9200)

@patch('search_service.proxy.elasticsearch.Elasticsearch', autospec=True)
def test_setup_client_with_username_and_password(self, elasticsearch_mock: MagicMock) -> None:
Expand Down Expand Up @@ -509,7 +511,6 @@ def test_create_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
start_data = [
Table(id='snowflake://blue.test_schema/bank_accounts', cluster='blue', column_names=['1', '2'],
database='snowflake', schema='test_schema', description='A table for something',
Expand Down Expand Up @@ -574,12 +575,17 @@ def test_create_document(self, mock_uuid: MagicMock) -> None:
'programmatic_descriptions': ["test"]
}
]
mock_elasticsearch.bulk.return_value = {'errors': False}

_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])

_bulk = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_bulk.return_value = {'errors': False}

expected_alias = 'table_search_index'
result = self.es_proxy.create_document(data=start_data, index=expected_alias)
self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk.assert_called_with(body=expected_data)

def test_update_document_with_no_data(self) -> None:
expected = ''
Expand All @@ -590,7 +596,8 @@ def test_update_document_with_no_data(self) -> None:
def test_update_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])
mock_uuid.return_value = new_index_name
table_key = 'snowflake://blue.test_schema/bitcoin_wallets'
expected_alias = 'table_search_index'
Expand Down Expand Up @@ -632,14 +639,18 @@ def test_update_document(self, mock_uuid: MagicMock) -> None:
]
result = self.es_proxy.update_document(data=data, index=expected_alias)
self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

@patch('uuid.uuid4')
def test_delete_table_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])

_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])

expected_alias = 'table_search_index'
data = ['id1', 'id2']

Expand All @@ -650,14 +661,16 @@ def test_delete_table_document(self, mock_uuid: MagicMock) -> None:
result = self.es_proxy.delete_document(data=data, index=expected_alias)

self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

@patch('uuid.uuid4')
def test_delete_user_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'tester_index_name'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])
expected_alias = 'user_search_index'
data = ['id1', 'id2']

Expand All @@ -668,14 +681,16 @@ def test_delete_user_document(self, mock_uuid: MagicMock) -> None:
result = self.es_proxy.delete_document(data=data, index=expected_alias)

self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

@patch('uuid.uuid4')
def test_delete_feature_document(self, mock_uuid: MagicMock) -> None:
mock_elasticsearch = self.es_proxy.elasticsearch
new_index_name = 'test_indx'
mock_uuid.return_value = new_index_name
mock_elasticsearch.indices.get_alias.return_value = dict([(new_index_name, {})])
_get_alias = mock.create_autospec(mock_elasticsearch.indices.get_alias)
_get_alias.return_value = dict([(new_index_name, {})])
expected_alias = 'feature_search_index'
data = ['id1', 'id2']

Expand All @@ -686,7 +701,8 @@ def test_delete_feature_document(self, mock_uuid: MagicMock) -> None:
result = self.es_proxy.delete_document(data=data, index=expected_alias)

self.assertEqual(expected_alias, result)
mock_elasticsearch.bulk.assert_called_with(expected_data)
_bulk = mock.create_autospec(mock_elasticsearch.bulk)
_bulk.assert_called_with(body=expected_data)

def test_get_instance_string(self) -> None:
result = self.es_proxy._get_instance('column', 'value')
Expand Down