Skip to content

Commit

Permalink
[dc] Adds jamf_inventory connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrey Fedorov committed Nov 16, 2019
1 parent 0ecdd7f commit 925ca75
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 9 deletions.
10 changes: 1 addition & 9 deletions src/connectors/aws_collect.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import io
from typing import Dict, List, Generator

from connectors.utils import sts_assume_role, qmap_mp
from connectors.utils import sts_assume_role, qmap_mp, updated
from runners.helpers import db, log
from runners.utils import groups_of

Expand Down Expand Up @@ -372,14 +372,6 @@
}


def updated(d, *ds, **kwargs):
"""Shallow merges dictionaries together, mutating + returning first arg"""
for new_d in ds:
d.update(new_d)
if kwargs:
d.update(kwargs)
return d


def aws_collect(client, method, params=None):
if params is None:
Expand Down
73 changes: 73 additions & 0 deletions src/connectors/jamf_inventory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import asyncio
import aiohttp
import json
from json.decoder import JSONDecodeError
from random import random
from dateutil.parser import parse as parse_date

from connectors.utils import updated
from runners.helpers import db, log


CONNECTION_OPTIONS = [
{
'type': 'str',
'name': 'credentials',
'title': "API Credentials",
'prompt': "b64(username:password)",
'placeholder': "bWVvdzpodW50cmVzczIK",
'secret': True,
'required': True,
}
]

HEADERS = {}
REQUEST_SPREAD_IN_SECONDS = 180


async def fetch(session, url, fetch_over=0):
if fetch_over:
await asyncio.sleep(fetch_over * random())
async with session.get(
f'https://snowflake.jamfcloud.com/JSSResource{url}', headers=HEADERS
) as response:
txt = await response.text()
result = {'recorded_at': parse_date(response.headers.get('Date'))}
try:
return updated(result, json.loads(txt))
except JSONDecodeError:
log.info(f'GET {url} -> status({response.status}) text({txt})')
return result


def fetch_computer(s, cid):
return fetch(s, f'/computers/id/{cid}', fetch_over=REQUEST_SPREAD_IN_SECONDS)


async def main(table_name):
async with aiohttp.ClientSession() as session:
cids = [
c['id'] for c in (await fetch(session, '/computers')).get('computers', [])
]

log.info(f'loading {len(cids)} computer details')
computers = await asyncio.gather(
*[fetch_computer(session, cid) for cid in cids]
)

log.info(f'inserting {len(computers)} computers into {table_name}')
db.insert(
table_name,
[
updated(c.get('computer'), computer_id=cid)
for cid, c in zip(cids, computers)
],
)


def ingest(table_name, options):
global HEADERS
creds = options.get('credentials', '')
HEADERS = {'Authorization': f'Basic {creds}', 'Accept': 'application/json'}

asyncio.get_event_loop().run_until_complete(main(f'data.{table_name}'))
11 changes: 11 additions & 0 deletions src/connectors/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@
from runners.helpers.dbconfig import ROLE as SA_ROLE


def updated(d=None, *ds, **kwargs):
"""Shallow merges dictionaries together, mutating + returning first arg"""
if d is None:
d = {}
for new_d in ds:
d.update(new_d)
if kwargs:
d.update(kwargs)
return d


def qmap_mp(num_threads, f, args):
payloads = mp.JoinableQueue()
procs = []
Expand Down
1 change: 1 addition & 0 deletions src/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
packages=find_packages(),
include_package_data=True,
install_requires=[
'aiohttp[speedups]',
'fire==0.1.3',
'jira==2.0.0',
'PyYAML==4.2b1',
Expand Down

0 comments on commit 925ca75

Please sign in to comment.