From 79c3702480fe79825401f2ddd6cfa9d0c489d92b Mon Sep 17 00:00:00 2001 From: sfc-gh-afedorov Date: Fri, 11 Sep 2020 17:59:52 -0700 Subject: [PATCH] [ingest] Removes agari and ingest_runner.py this was deprecated by data connectors awhile ago --- src/ingestion/__init__.py | 0 src/ingestion/agari.py | 97 ------------------------------------ src/runners/ingest_runner.py | 27 ---------- src/runners/run.py | 10 +--- 4 files changed, 1 insertion(+), 133 deletions(-) delete mode 100644 src/ingestion/__init__.py delete mode 100644 src/ingestion/agari.py delete mode 100644 src/runners/ingest_runner.py diff --git a/src/ingestion/__init__.py b/src/ingestion/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/src/ingestion/agari.py b/src/ingestion/agari.py deleted file mode 100644 index b43edfd72..000000000 --- a/src/ingestion/agari.py +++ /dev/null @@ -1,97 +0,0 @@ -import datetime -import requests -from os import environ -from runners.helpers import db, log - -AGARI_TOKEN = environ.get('AGARI_TOKEN') -AGARI_SECRET = environ.get('AGARI_SECRET') -AGARI_TABLE = environ.get('AGARI_TABLE') - -AGARI_TRUST_CUTOFF = 5.0 - -URLS = ['https://api.agari.com/v1/ep/messages'] - - -# Agari provides bearer auth tokens, -def gen_headers(): - r = requests.post( - 'https://api.agari.com/oauth/token', - data={'client_id': AGARI_TOKEN, 'client_secret': AGARI_SECRET}, - ) - token = r.json()['access_token'] - return {'Authorization': f'Bearer {token}'} - - -def load_data(messages): - data = [(m, m['date']) for m in messages] - try: - db.insert(AGARI_TABLE, data, select='PARSE_JSON(column1), column2') - except Exception as e: - log.error("failed to ingest data", e) - - -def get_newest_timestamp(): - # check table in snowflake and get most recent timestamp - query = f"SELECT raw FROM {AGARI_TABLE} ORDER BY event_time DESC LIMIT 1" - try: - return list(db.fetch(query))[0]['RAW']['date'] - except Exception: - log.error("no earlier data found") - return None - - -def enrich(message, headers): - url = f"https://api.agari.com/v1/ep/messages/{message['id']}" - r = requests.get(url=url, headers=headers) - data = r.json() - return data['message'] - - -def process_endpoint(url): - params = {'start_date': get_newest_timestamp(), 'limit': 100, 'offset': 0} - headers = gen_headers() - - # If there's no start date defined. Agari defaults to one day ago. - if params['start_date'] is None: - params.pop('start_date') - - while True: - r = requests.get(url=url, params=params, headers=headers) - data = r.json() - log.info(params) - - if r.status_code != 200: - log.error(f"Ingest request for {url} failed.") - db.record_failed_ingestion(AGARI_TABLE, r, datetime.datetime.utcnow()) - break - - for message in data['messages']: - if float(message['message_trust_score']) <= AGARI_TRUST_CUTOFF: - message['details'] = enrich(message, headers) - - load_data(data['messages']) - - if params['offset'] == 9900: # Maximum offset is 9900 - params = {'start_date': get_newest_timestamp(), 'limit': 100, 'offset': 0} - - elif ( - len(data['messages']) == params['limit'] - ): # if we got the limit of messages, get the next page - params['offset'] += params['limit'] - else: - break - - -def main(): - reqenv = {'AGARI_TOKEN', 'AGARI_SECRET', 'AGARI_TABLE'} - missingenv = reqenv - set(environ) - if missingenv: - log.fatal(f"missing env vars: {missingenv}") - - for url in URLS: - process_endpoint(url) - - -if __name__ == '__main__': - if db.connect(): - main() diff --git a/src/runners/ingest_runner.py b/src/runners/ingest_runner.py deleted file mode 100644 index 2043b842a..000000000 --- a/src/runners/ingest_runner.py +++ /dev/null @@ -1,27 +0,0 @@ -'''SA Ingestion Runner (SAIR) - -SAIR processes all the files in the src/ingestion folder -''' - -import os -import subprocess - -from runners.helpers import log - - -def main(script=None): - scripts = [script] if script else os.listdir('../ingestion') - - for name in scripts: - log.info(f"--- Ingesting using {name}") - try: - res = subprocess.call(f"python ../ingestion/{name}", shell=True) - log.info("subprocess returns:", res) - except Exception as e: - log.error(f"exception raised:", e) - finally: - log.info(f"--- {name} finished") - - -if __name__ == "__main__": - main() diff --git a/src/runners/run.py b/src/runners/run.py index bd80093dc..304a5be6b 100644 --- a/src/runners/run.py +++ b/src/runners/run.py @@ -2,7 +2,7 @@ import fire -from runners import ingest_runner, connectors_runner +from runners import connectors_runner from runners import alert_queries_runner from runners import alert_suppressions_runner @@ -21,10 +21,6 @@ def main(target="all", *rule_names): for rule_name in rule_names: connectors_runner.main(rule_name.upper()) - elif target == "ingest" and rule_names: - for rule_name in rule_names: - ingest_runner.main(rule_names) - elif target == "processor": alert_processor.main() @@ -73,10 +69,6 @@ def main(target="all", *rule_names): alert_processor.main() alert_dispatcher.main() - if target in ['ingest']: - ingest_runner.main() - connectors_runner.main() - if __name__ == '__main__': fire.Fire(main)