Skip to content

Commit

Permalink
feat: Ingest Mode Analytics user (amundsen-io#282)
Browse files Browse the repository at this point in the history
* Ingest Mode Analytics user

* Flake8

* Update

* Added doc
  • Loading branch information
jinhyukchang authored Jun 8, 2020
1 parent e513e68 commit b902562
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 3 deletions.
24 changes: 24 additions & 0 deletions databuilder/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,27 @@ job = DefaultJob(conf=job_config,
job.launch()
```

#### [ModeDashboardUserExtractor](./databuilder/extractor/dashboard/mode_analytics/mode_dashboard_user_extractor.py)
A Extractor that extracts Mode user_id and then update User node.

You can create Databuilder job config like this. (configuration related to loader and publisher is omitted as it is mostly the same. Please take a look at this [example](#ModeDashboardExtractor) for the configuration that holds loader and publisher.

```python
extractor = ModeDashboardUserExtractor()
task = DefaultTask(extractor=extractor, loader=FsNeo4jCSVLoader())

job_config = ConfigFactory.from_dict({
'{}.{}'.format(extractor.get_scope(), ORGANIZATION): organization,
'{}.{}'.format(extractor.get_scope(), MODE_ACCESS_TOKEN): mode_token,
'{}.{}'.format(extractor.get_scope(), MODE_PASSWORD_TOKEN): mode_password,
})

job = DefaultJob(conf=job_config,
task=task,
publisher=Neo4jCsvPublisher())
job.launch()
```

#### [ModeDashboardUsageExtractor](./databuilder/extractor/dashboard/mode_analytics/mode_dashboard_usage_extractor.py)

A Extractor that extracts Mode dashboard's accumulated view count.
Expand Down Expand Up @@ -625,6 +646,9 @@ Transforms dictionary into model
#### [TimestampStringToEpoch](./databuilder/transformer/timestamp_string_to_epoch.py)
Transforms string timestamp into int epoch

#### [RemoveFieldTransformer](./databuilder/transformer/remove_field_transformer.py)
Remove fields from the Dict.


## List of loader
#### [FsNeo4jCSVLoader](https://github.com/lyft/amundsendatabuilder/blob/master/databuilder/loader/file_system_neo4j_csv_loader.py "FsNeo4jCSVLoader")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import logging
from typing import Any # noqa: F401

from pyhocon import ConfigTree, ConfigFactory # noqa: F401
from requests.auth import HTTPBasicAuth

from databuilder import Scoped
from databuilder.extractor.base_extractor import Extractor
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_constants import ORGANIZATION, MODE_ACCESS_TOKEN, \
MODE_PASSWORD_TOKEN
from databuilder.extractor.dashboard.mode_analytics.mode_dashboard_utils import ModeDashboardUtils
from databuilder.rest_api.base_rest_api_query import RestApiQuerySeed
from databuilder.rest_api.rest_api_failure_handlers import HttpFailureSkipOnStatus
from databuilder.rest_api.rest_api_query import RestApiQuery
from databuilder.transformer.base_transformer import ChainedTransformer
from databuilder.transformer.dict_to_model import DictToModel, MODEL_CLASS
from databuilder.transformer.remove_field_transformer import RemoveFieldTransformer, FIELD_NAMES

LOGGER = logging.getLogger(__name__)


class ModeDashboardUserExtractor(Extractor):
"""
An Extractor that extracts all Mode Dashboard user and add mode_user_id attribute to User model.
"""

def init(self, conf):
# type: (ConfigTree) -> None
self._conf = conf

restapi_query = self._build_restapi_query()
self._extractor = ModeDashboardUtils.create_mode_rest_api_extractor(
restapi_query=restapi_query,
conf=self._conf
)

# Remove all unnecessary fields because User model accepts all attributes and push it to Neo4j.
transformers = []

remove_fields_transformer = RemoveFieldTransformer()
remove_fields_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, remove_fields_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{FIELD_NAMES: ['organization', 'mode_user_resource_path', 'product']})))
transformers.append(remove_fields_transformer)

dict_to_model_transformer = DictToModel()
dict_to_model_transformer.init(
conf=Scoped.get_scoped_conf(self._conf, dict_to_model_transformer.get_scope()).with_fallback(
ConfigFactory.from_dict(
{MODEL_CLASS: 'databuilder.models.user.User'})))
transformers.append(dict_to_model_transformer)

self._transformer = ChainedTransformer(transformers=transformers)

def extract(self):
# type: () -> Any

record = self._extractor.extract()
if not record:
return None

return self._transformer.transform(record=record)

def get_scope(self):
# type: () -> str
return 'extractor.mode_dashboard_owner'

def _build_restapi_query(self):
"""
Build REST API Query. To get Mode Dashboard owner, it needs to call three APIs (spaces API, reports
API, and user API) joining together.
:return: A RestApiQuery that provides Mode Dashboard owner
"""
# type: () -> RestApiQuery

# Seed query record for next query api to join with
seed_record = [{
'organization': self._conf.get_string(ORGANIZATION),
'is_active': None,
'updated_at': None,
'do_not_update_empty_attribute': True,
}]
seed_query = RestApiQuerySeed(seed_record=seed_record)

# memberships
# https://mode.com/developer/api-reference/management/organization-memberships/#listMemberships
memberships_url_template = 'https://app.mode.com/api/{organization}/memberships'
params = {'auth': HTTPBasicAuth(self._conf.get_string(MODE_ACCESS_TOKEN),
self._conf.get_string(MODE_PASSWORD_TOKEN))}

json_path = '(_embedded.memberships[*].member_username) | (_embedded.memberships[*]._links.user.href)'
field_names = ['mode_user_id', 'mode_user_resource_path']
mode_user_ids_query = RestApiQuery(query_to_join=seed_query, url=memberships_url_template, params=params,
json_path=json_path, field_names=field_names,
skip_no_result=True, json_path_contains_or=True)

# https://mode.com/developer/api-reference/management/users/
user_url_template = 'https://app.mode.com{mode_user_resource_path}'

json_path = 'email'
field_names = ['email']
failure_handler = HttpFailureSkipOnStatus(status_codes_to_skip={404})
mode_user_email_query = RestApiQuery(query_to_join=mode_user_ids_query, url=user_url_template,
params=params, json_path=json_path, field_names=field_names,
skip_no_result=True, can_skip_failure=failure_handler.can_skip_failure)

return mode_user_email_query
15 changes: 14 additions & 1 deletion databuilder/databuilder/models/user.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ def __init__(self,
is_active=True, # type: bool
updated_at=0, # type: int
role_name='', # type: str
do_not_update_empty_attribute=False, # type: bool
**kwargs # type: Dict
):
# type: (...) -> None
Expand All @@ -62,6 +63,8 @@ def __init__(self,
then we will have a cron job to update the ex-employee nodes based on
the case if this timestamp hasn't been updated for two weeks.
:param role_name: the role_name of the user (e.g swe)
:param do_not_update_empty_attribute: If False, all empty or not defined params will be overwritten with
empty string.
:param kwargs: Any K/V attributes we want to update the
"""
self.first_name = first_name
Expand All @@ -79,6 +82,7 @@ def __init__(self,
self.is_active = is_active
self.updated_at = updated_at
self.role_name = role_name
self.do_not_update_empty_attribute = do_not_update_empty_attribute
self.attrs = None
if kwargs:
self.attrs = copy.deepcopy(kwargs)
Expand Down Expand Up @@ -132,14 +136,23 @@ def create_nodes(self):
result_node[User.USER_NODE_TEAM] = self.team_name if self.team_name else ''
result_node[User.USER_NODE_EMPLOYEE_TYPE] = self.employee_type if self.employee_type else ''
result_node[User.USER_NODE_SLACK_ID] = self.slack_id if self.slack_id else ''
result_node[User.USER_NODE_UPDATED_AT] = self.updated_at if self.updated_at else 0
result_node[User.USER_NODE_ROLE_NAME] = self.role_name if self.role_name else ''

if self.updated_at:
result_node[User.USER_NODE_UPDATED_AT] = self.updated_at
elif not self.do_not_update_empty_attribute:
result_node[User.USER_NODE_UPDATED_AT] = 0

if self.attrs:
for k, v in self.attrs.items():
if k not in result_node:
result_node[k] = v

if self.do_not_update_empty_attribute:
for k, v in list(result_node.items()):
if not v:
del result_node[k]

return [result_node]

def create_relation(self):
Expand Down
34 changes: 34 additions & 0 deletions databuilder/databuilder/transformer/remove_field_transformer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import logging
from typing import Any, Dict # noqa: F401

from pyhocon import ConfigTree # noqa: F401

from databuilder.transformer.base_transformer import Transformer

FIELD_NAMES = 'field_names' # field name to be removed

LOGGER = logging.getLogger(__name__)


class RemoveFieldTransformer(Transformer):
"""
Remove field in Dict by specifying list of fields (keys).
"""

def init(self, conf):
# type: (ConfigTree) -> None
self._field_names = conf.get_list(FIELD_NAMES)

def transform(self, record):
# type: (Dict[str, Any]) -> Dict[str, Any]

for k in self._field_names:
if k in record:
del record[k]

return record

def get_scope(self):
# type: () -> str
return 'transformer.remove_field'
2 changes: 2 additions & 0 deletions databuilder/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,5 @@ unicodecsv==0.14.1,<1.0

httplib2>=0.18.0
unidecode

requests==2.23.0,<3.0
2 changes: 1 addition & 1 deletion databuilder/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from setuptools import setup, find_packages


__version__ = '2.6.0'
__version__ = '2.6.1'

requirements_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), 'requirements.txt')
with open(requirements_path) as requirements_file:
Expand Down
20 changes: 19 additions & 1 deletion databuilder/tests/unit/models/test_user.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

from databuilder.models.neo4j_csv_serde import RELATION_START_KEY, RELATION_START_LABEL, RELATION_END_KEY, \
RELATION_END_LABEL, RELATION_TYPE, RELATION_REVERSE_TYPE

from databuilder.models.user import User


Expand Down Expand Up @@ -71,3 +70,22 @@ def test_create_relation(self):
}

self.assertTrue(relation in relations)

def test_not_including_empty_attribute(self):
# type: () -> None
test_user = User(email='test@email.com',
foo='bar')

self.assertDictEqual(test_user.create_next_node(),
{'KEY': 'test@email.com', 'LABEL': 'User', 'email': 'test@email.com',
'is_active:UNQUOTED': True, 'first_name': '', 'last_name': '', 'full_name': '',
'github_username': '', 'team_name': '', 'employee_type': '', 'slack_id': '',
'role_name': '', 'updated_at': 0, 'foo': 'bar'})

test_user2 = User(email='test@email.com',
foo='bar',
is_active=None,
do_not_update_empty_attribute=True)

self.assertDictEqual(test_user2.create_next_node(),
{'KEY': 'test@email.com', 'LABEL': 'User', 'email': 'test@email.com', 'foo': 'bar'})
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import unittest

from pyhocon import ConfigFactory

from databuilder.transformer.remove_field_transformer import RemoveFieldTransformer, FIELD_NAMES


class TestRemoveFieldTransformer(unittest.TestCase):

def test_conversion(self):
# type: () -> None

transformer = RemoveFieldTransformer()
config = ConfigFactory.from_dict({
FIELD_NAMES: ['foo', 'bar'],
})
transformer.init(conf=config)

actual = transformer.transform({
'foo': 'foo_val',
'bar': 'bar_val',
'baz': 'baz_val',
})
expected = {
'baz': 'baz_val'
}
self.assertDictEqual(expected, actual)

def test_conversion_missing_field(self):
# type: () -> None

transformer = RemoveFieldTransformer()
config = ConfigFactory.from_dict({
FIELD_NAMES: ['foo', 'bar'],
})
transformer.init(conf=config)

actual = transformer.transform({
'foo': 'foo_val',
'baz': 'baz_val',
'john': 'doe',
})
expected = {
'baz': 'baz_val',
'john': 'doe'
}
self.assertDictEqual(expected, actual)


if __name__ == '__main__':
unittest.main()

0 comments on commit b902562

Please sign in to comment.