Skip to content

Commit

Permalink
🎉 Initial commit.
Browse files Browse the repository at this point in the history
  • Loading branch information
mgorsk1 committed Jul 9, 2020
1 parent 19b5c7b commit 4fc6f8c
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 5 deletions.
92 changes: 88 additions & 4 deletions metadata_service/proxy/atlas_proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
from typing import Any, Dict, List, Union, Optional

from amundsen_common.models.popular_table import PopularTable
from amundsen_common.models.table import Column, Statistics, Table, Tag, User
from amundsen_common.models.table import Column, Statistics, Table, Tag, User, Reader
from amundsen_common.models.user import User as UserEntity
from amundsen_common.models.dashboard import DashboardSummary
from atlasclient.client import Atlas
from atlasclient.exceptions import BadRequest
from atlasclient.models import EntityUniqueAttribute
from atlasclient.utils import (make_table_qualified_name,
parse_table_qualified_name)
parse_table_qualified_name,
extract_entities)
from beaker.cache import CacheManager
from beaker.util import parse_cache_config_options
from flask import current_app as app
Expand Down Expand Up @@ -41,6 +42,7 @@ class AtlasProxy(BaseProxy):
STATISTICS_FORMAT_SPEC = app.config['STATISTICS_FORMAT_SPEC']
BOOKMARK_TYPE = 'Bookmark'
USER_TYPE = 'User'
READER_TYPE = 'Reader'
QN_KEY = 'qualifiedName'
BOOKMARK_ACTIVE_KEY = 'active'
GUID_KEY = 'guid'
Expand Down Expand Up @@ -386,6 +388,7 @@ def get_table(self, *, table_uri: str) -> Table:
description=attrs.get('description') or attrs.get('comment'),
owners=[User(email=attrs.get('owner'))],
columns=columns,
table_readers=self._get_readers(attrs.get(self.QN_KEY)),
last_updated_timestamp=self._parse_date(table_details.get('updateTime')))

return table
Expand Down Expand Up @@ -596,8 +599,42 @@ def get_table_by_user_relation(self, *, user_email: str, relation_type: UserReso

return {'table': results}

def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, Any]:
pass
def get_frequently_used_tables(self, *, user_email: str) -> Dict[str, List[PopularTable]]:
user = self._driver.entity_unique_attribute(self.USER_TYPE, qualifiedName=user_email).entity

readers_guids = []
for user_reads in user['relationshipAttributes'].get('entityReads'):
entity_status = user_reads['entityStatus']
relationship_status = user_reads['relationshipStatus']

if entity_status == 'ACTIVE' and relationship_status == 'ACTIVE':
readers_guids.append(user_reads['guid'])

readers = extract_entities(self._driver.entity_bulk(guid=readers_guids, ignoreRelationships=True))

_results = {}
for reader in readers:
entity_uri = reader.attributes.get(self.ENTITY_URI_KEY)
count = reader.attributes.get('count')

if count:
details = self._extract_info_from_uri(table_uri=entity_uri)

_results[count] = dict(cluster=details.get('cluster'),
name=details.get('name'),
schema=details.get('db'),
database=details.get('entity'))

sorted_counts = sorted(_results.keys())

results = []
for count in sorted_counts:
data: dict = _results.get(count, dict())
table = PopularTable(**data)

results.append(table)

return {'table': results}

def add_resource_relation_by_user(self, *,
id: str,
Expand Down Expand Up @@ -652,6 +689,53 @@ def _parse_date(self, date: int) -> Optional[int]:
except Exception:
return None

def _get_readers(self, qualified_name: str, top: Optional[int] = None) -> List[Reader]:
params = {
'typeName': self.READER_TYPE,
'offset': '0',
'limit': top or 15,
'excludeDeletedEntities': True,
'entityFilters': {
'condition': 'AND',
'criterion': [
{
'attributeName': self.QN_KEY,
'operator': 'STARTSWITH',
'attributeValue': qualified_name.split('@')[0]
},
{
'attributeName': 'count',
'operator': 'gte',
'attributeValue': f'{app.config["POPULAR_TABLE_MINIMUM_READER_COUNT"]}'
}
]
},
'attributes': ['count', self.QN_KEY],
'sortBy': 'count',
'sortOrder': 'DESCENDING'
}

search_results = self._driver.search_basic.create(data=params, ignoreRelationships=False)

readers = []

for record in search_results.entities:
readers.append(record.guid)

results = []

if len(readers) > 0:
full_entities = extract_entities(self._driver.entity_bulk(guid=readers, ignoreRelationships=False))

for r in full_entities:
reader = Reader(user=User(email=r.relationshipAttributes['user']['displayText'],
user_id=r.relationshipAttributes['user']['displayText']),
read_count=r.attributes['count'])

results.append(reader)

return results

def get_dashboard(self,
dashboard_uri: str,
) -> DashboardDetailEntity:
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ neotime==1.7.1
pytz==2018.4
requests-aws4auth==0.9
statsd==3.2.1
pyatlasclient==1.0.3
pyatlasclient==1.0.4
beaker>=1.10.0
mocket==3.7.3
overrides==2.5
Expand Down

0 comments on commit 4fc6f8c

Please sign in to comment.