From 925ca7517ece91c83dfb9a95f5728db2757198c6 Mon Sep 17 00:00:00 2001 From: Andrey Fedorov Date: Fri, 15 Nov 2019 20:05:57 -0800 Subject: [PATCH] [dc] Adds jamf_inventory connector --- src/connectors/aws_collect.py | 10 +---- src/connectors/jamf_inventory.py | 73 ++++++++++++++++++++++++++++++++ src/connectors/utils.py | 11 +++++ src/setup.py | 1 + 4 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 src/connectors/jamf_inventory.py diff --git a/src/connectors/aws_collect.py b/src/connectors/aws_collect.py index 32eb1091f..e970fad74 100644 --- a/src/connectors/aws_collect.py +++ b/src/connectors/aws_collect.py @@ -9,7 +9,7 @@ import io from typing import Dict, List, Generator -from connectors.utils import sts_assume_role, qmap_mp +from connectors.utils import sts_assume_role, qmap_mp, updated from runners.helpers import db, log from runners.utils import groups_of @@ -372,14 +372,6 @@ } -def updated(d, *ds, **kwargs): - """Shallow merges dictionaries together, mutating + returning first arg""" - for new_d in ds: - d.update(new_d) - if kwargs: - d.update(kwargs) - return d - def aws_collect(client, method, params=None): if params is None: diff --git a/src/connectors/jamf_inventory.py b/src/connectors/jamf_inventory.py new file mode 100644 index 000000000..22d0ce28c --- /dev/null +++ b/src/connectors/jamf_inventory.py @@ -0,0 +1,73 @@ +import asyncio +import aiohttp +import json +from json.decoder import JSONDecodeError +from random import random +from dateutil.parser import parse as parse_date + +from connectors.utils import updated +from runners.helpers import db, log + + +CONNECTION_OPTIONS = [ + { + 'type': 'str', + 'name': 'credentials', + 'title': "API Credentials", + 'prompt': "b64(username:password)", + 'placeholder': "bWVvdzpodW50cmVzczIK", + 'secret': True, + 'required': True, + } +] + +HEADERS = {} +REQUEST_SPREAD_IN_SECONDS = 180 + + +async def fetch(session, url, fetch_over=0): + if fetch_over: + await asyncio.sleep(fetch_over * random()) + async with session.get( + f'https://snowflake.jamfcloud.com/JSSResource{url}', headers=HEADERS + ) as response: + txt = await response.text() + result = {'recorded_at': parse_date(response.headers.get('Date'))} + try: + return updated(result, json.loads(txt)) + except JSONDecodeError: + log.info(f'GET {url} -> status({response.status}) text({txt})') + return result + + +def fetch_computer(s, cid): + return fetch(s, f'/computers/id/{cid}', fetch_over=REQUEST_SPREAD_IN_SECONDS) + + +async def main(table_name): + async with aiohttp.ClientSession() as session: + cids = [ + c['id'] for c in (await fetch(session, '/computers')).get('computers', []) + ] + + log.info(f'loading {len(cids)} computer details') + computers = await asyncio.gather( + *[fetch_computer(session, cid) for cid in cids] + ) + + log.info(f'inserting {len(computers)} computers into {table_name}') + db.insert( + table_name, + [ + updated(c.get('computer'), computer_id=cid) + for cid, c in zip(cids, computers) + ], + ) + + +def ingest(table_name, options): + global HEADERS + creds = options.get('credentials', '') + HEADERS = {'Authorization': f'Basic {creds}', 'Accept': 'application/json'} + + asyncio.get_event_loop().run_until_complete(main(f'data.{table_name}')) diff --git a/src/connectors/utils.py b/src/connectors/utils.py index 4665064c8..49bc43207 100644 --- a/src/connectors/utils.py +++ b/src/connectors/utils.py @@ -7,6 +7,17 @@ from runners.helpers.dbconfig import ROLE as SA_ROLE +def updated(d=None, *ds, **kwargs): + """Shallow merges dictionaries together, mutating + returning first arg""" + if d is None: + d = {} + for new_d in ds: + d.update(new_d) + if kwargs: + d.update(kwargs) + return d + + def qmap_mp(num_threads, f, args): payloads = mp.JoinableQueue() procs = [] diff --git a/src/setup.py b/src/setup.py index b41c43c70..93a0a0dc6 100644 --- a/src/setup.py +++ b/src/setup.py @@ -6,6 +6,7 @@ packages=find_packages(), include_package_data=True, install_requires=[ + 'aiohttp[speedups]', 'fire==0.1.3', 'jira==2.0.0', 'PyYAML==4.2b1',