Skip to content

Commit

Permalink
v1.6.1 Polish and Fixes
Browse files Browse the repository at this point in the history
* Adds owner field to Violations, default values for missing fields
* Adds config account selection option to installer
* Adds data views for querying rules by tag
* Adds query_name set in Alert Query Runners instead of AQ's

* Fixes default value of alert event_time in WebUI
* Fixes bugs in WebUI, installer, and ingestion script
* Fixes bugs in VQ runner metadata run and error recording
  • Loading branch information
sfc-gh-afedorov authored Apr 4, 2019
1 parent 95fc587 commit c45c2ad
Show file tree
Hide file tree
Showing 19 changed files with 279 additions and 170 deletions.
2 changes: 1 addition & 1 deletion src/ingestion/list_aws_accounts.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def load_accounts_list(sf_client, accounts_list):
def main():
sf_client = get_snowflake_client()
current_time = datetime.datetime.now(datetime.timezone.utc)
last_time = sf_client.cursor().execute(f'SELECT max(timestamp) FROM {AWS_ACCOUNTS_TABLE}').fetchall[0][0]
last_time = sf_client.cursor().execute(f'SELECT max(timestamp) FROM {AWS_ACCOUNTS_TABLE}').fetchall()[0][0]
if (current_time - last_time).seconds > 86400:
client = get_aws_client()
accounts_list = get_accounts_list(client)
Expand Down
17 changes: 10 additions & 7 deletions src/ingestion/okta_ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
ALOOMA_SDK = alooma_pysdk.PythonSDK(INPUT_TOKEN)
OKTA_API_KEY = environ.get('OKTA_API_KEY')
AUTH = "SSWS "+OKTA_API_KEY
OKTA_URL = environ.get('OKTA_URL')
OKTA_TABLE = environ.get('OKTA_TABLE')

HEADERS = {'Accept': 'application/json', 'Content-Type': 'application/json', 'Authorization': AUTH}

Expand All @@ -24,17 +26,18 @@ def process_logs(logs):
def get_timestamp():

# Once pipelines are more strongly integrated with the installer, this table should be a variable
timestamp_query = """
SELECT PUBLISHED from SECURITY.ALOOMA.SNOWBIZ_OKTA
order by PUBLISHED desc
timestamp_query = f"""
SELECT EVENT_TIME from {OKTA_TABLE}
WHERE EVENT_TIME IS NOT NULL
order by EVENT_TIME desc
limit 1
"""
try:
_, ts = db.connect_and_fetchall(timestamp_query)
print(ts)
log.info(ts)
ts = ts[0][0]
ts = ts.strftime("%Y-%m-%dT%H:%M:%S.000Z")
print(ts)
log.info(ts)
if len(ts) < 1:
log.error("The okta timestamp is too short or doesn't exist; defaulting to one hour ago")
ts = datetime.datetime.now() - datetime.timedelta(hours=1)
Expand All @@ -46,13 +49,13 @@ def get_timestamp():
ts = ts.strftime("%Y-%m-%dT%H:%M:%S.000Z")

ret = {'since': ts}
print(ret)
log.info(ret)

return ret


def main():
url = 'https://snowbiz.okta.com/api/v1/logs'
url = OKTA_URL
log.info("starting loop")
timestamp = get_timestamp()
while 1:
Expand Down
7 changes: 5 additions & 2 deletions src/runners/alert_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,11 @@ def main():
# Record the new ticket id in the alert table
record_ticket_id(ctx, ticket_id, alert['ALERT_ID'])

if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Handler'}], 1)
try:
if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Handler'}], 1)
except Exception as e:
log.error("Cloudwatch metric logging failed", e)


if __name__ == "__main__":
Expand Down
14 changes: 5 additions & 9 deletions src/runners/alert_queries_runner.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!/usr/bin/env python

import json
import hashlib
import uuid
import datetime
from typing import Any, Dict, List, Tuple
Expand All @@ -21,12 +20,6 @@
QUERY_HISTORY: List = []


def alert_group(alert) -> str:
return hashlib.md5(
f"{alert['OBJECT']}{alert['DESCRIPTION']}".encode('utf-8')
).hexdigest()


def log_alerts(ctx, alerts):
if len(alerts):
print("Recording alerts.")
Expand Down Expand Up @@ -121,8 +114,11 @@ def main(rule_name=None):
}
log.metadata_record(ctx, RUN_METADATA, table=RUN_METADATA_TABLE)

if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Query Runner'}], 1)
try:
if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Query Runner'}], 1)
except Exception as e:
log.error("Cloudwatch metric logging failed: ", e)


if __name__ == '__main__':
Expand Down
9 changes: 6 additions & 3 deletions src/runners/alert_suppressions_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def log_failure(ctx, suppression_name, e, event_data=None, description=None):
alerts = [json.dumps({
'ALERT_ID': uuid.uuid4().hex,
'QUERY_ID': 'b1d02051dd2c4d62bb75274f2ee5996a',
'QUERY_NAME': 'Suppression Runner Failure',
'QUERY_NAME': ['Suppression Runner Failure'],
'ENVIRONMENT': 'Suppressions',
'SOURCES': 'Suppression Runner',
'ACTOR': 'Suppression Runner',
Expand Down Expand Up @@ -154,8 +154,11 @@ def main():

log.metadata_record(ctx, RUN_METADATA, table=RUN_METADATA_TABLE)

if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Suppression Runner'}], 1)
try:
if CLOUDWATCH_METRICS:
log.metric('Run', 'SnowAlert', [{'Name': 'Component', 'Value': 'Alert Suppression Runner'}], 1)
except Exception as e:
log.error("Cloudwatch metric logging failed: ", e)


if __name__ == '__main__':
Expand Down
34 changes: 18 additions & 16 deletions src/runners/config.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
import os
from os import environ
import uuid

from runners.helpers.dbconfig import DATABASE

ENV = environ.get('SA_ENV', 'unset')

# generated once per runtime
RUN_ID = uuid.uuid4().hex

# schema names
DATA_SCHEMA_NAME = os.environ.get('SA_DATA_SCHEMA_NAME', "data")
RULES_SCHEMA_NAME = os.environ.get('SA_RULES_SCHEMA_NAME', "rules")
RESULTS_SCHEMA_NAME = os.environ.get('SA_RESULTS_SCHEMA_NAME', "results")
DATA_SCHEMA_NAME = environ.get('SA_DATA_SCHEMA_NAME', "data")
RULES_SCHEMA_NAME = environ.get('SA_RULES_SCHEMA_NAME', "rules")
RESULTS_SCHEMA_NAME = environ.get('SA_RESULTS_SCHEMA_NAME', "results")

# table names
RESULTS_ALERTS_TABLE_NAME = os.environ.get('SA_RESULTS_ALERTS_TABLE_NAME', "alerts")
RESULTS_VIOLATIONS_TABLE_NAME = os.environ.get('SA_RESULTS_VIOLATIONS_TABLE_NAME', "violations")
QUERY_METADATA_TABLE_NAME = os.environ.get('SA_QUERY_METADATA_TABLE_NAME', "query_metadata")
RUN_METADATA_TABLE_NAME = os.environ.get('SA_RUN_METADATA_TABLE_NAME', "run_metadata")
RESULTS_ALERTS_TABLE_NAME = environ.get('SA_RESULTS_ALERTS_TABLE_NAME', "alerts")
RESULTS_VIOLATIONS_TABLE_NAME = environ.get('SA_RESULTS_VIOLATIONS_TABLE_NAME', "violations")
QUERY_METADATA_TABLE_NAME = environ.get('SA_QUERY_METADATA_TABLE_NAME', "query_metadata")
RUN_METADATA_TABLE_NAME = environ.get('SA_RUN_METADATA_TABLE_NAME', "run_metadata")

# schemas
DATA_SCHEMA = os.environ.get('SA_DATA_SCHEMA', f"{DATABASE}.{DATA_SCHEMA_NAME}")
RULES_SCHEMA = os.environ.get('SA_RULES_SCHEMA', f"{DATABASE}.{RULES_SCHEMA_NAME}")
RESULTS_SCHEMA = os.environ.get('SA_RESULTS_SCHEMA', f"{DATABASE}.{RESULTS_SCHEMA_NAME}")
DATA_SCHEMA = environ.get('SA_DATA_SCHEMA', f"{DATABASE}.{DATA_SCHEMA_NAME}")
RULES_SCHEMA = environ.get('SA_RULES_SCHEMA', f"{DATABASE}.{RULES_SCHEMA_NAME}")
RESULTS_SCHEMA = environ.get('SA_RESULTS_SCHEMA', f"{DATABASE}.{RESULTS_SCHEMA_NAME}")

# tables
ALERTS_TABLE = os.environ.get('SA_ALERTS_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_ALERTS_TABLE_NAME}")
VIOLATIONS_TABLE = os.environ.get('SA_VIOLATIONS_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_VIOLATIONS_TABLE_NAME}")
QUERY_METADATA_TABLE = os.environ.get('SA_QUERY_METADATA_TABLE', f"{RESULTS_SCHEMA}.{QUERY_METADATA_TABLE_NAME}")
RUN_METADATA_TABLE = os.environ.get('SA_METADATA_RUN_TABLE', f"{RESULTS_SCHEMA}.{RUN_METADATA_TABLE_NAME}")
ALERTS_TABLE = environ.get('SA_ALERTS_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_ALERTS_TABLE_NAME}")
VIOLATIONS_TABLE = environ.get('SA_VIOLATIONS_TABLE', f"{RESULTS_SCHEMA}.{RESULTS_VIOLATIONS_TABLE_NAME}")
QUERY_METADATA_TABLE = environ.get('SA_QUERY_METADATA_TABLE', f"{RESULTS_SCHEMA}.{QUERY_METADATA_TABLE_NAME}")
RUN_METADATA_TABLE = environ.get('SA_METADATA_RUN_TABLE', f"{RESULTS_SCHEMA}.{RUN_METADATA_TABLE_NAME}")

# misc
ALERT_QUERY_POSTFIX = "ALERT_QUERY"
Expand All @@ -35,7 +37,7 @@
VIOLATION_SQUELCH_POSTFIX = "VIOLATION_SUPPRESSION"

# enabling sends metrics to cloudwatch
CLOUDWATCH_METRICS = os.environ.get('CLOUDWATCH_METRICS', False)
CLOUDWATCH_METRICS = environ.get('CLOUDWATCH_METRICS', False)

CONFIG_VARS = [
('ALERTS_TABLE', ALERTS_TABLE),
Expand Down
80 changes: 42 additions & 38 deletions src/runners/helpers/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ def connect():
log.error(e, "Failed to connect.")


def fetch(ctx, query=None):
def fetch(ctx, query=None, fix_errors=True):
if query is None: # TODO(andrey): swap args and refactor
ctx, query = CACHED_CONNECTION, ctx

res = execute(ctx, query)
res = execute(ctx, query, fix_errors)
cols = [c[0] for c in res.description]
while True:
row = res.fetchone()
Expand All @@ -97,21 +97,23 @@ def fetch(ctx, query=None):


def execute(ctx, query=None, fix_errors=True):
# TODO(andrey): don't ignore errors by default
if query is None: # TODO(andrey): swap args and refactor
ctx, query = CACHED_CONNECTION, ctx

try:
return ctx.cursor().execute(query)

except snowflake.connector.errors.ProgrammingError as e:
if not fix_errors:
raise

if e.errno == int(MASTER_TOKEN_EXPIRED_GS_CODE):
connect(run_preflight_checks=False, flush_cache=True)
return execute(query)

log.error(e, f"Ignoring programming Error in query: {query}")
if not fix_errors:
log.debug(f"re-raising error '{e}' in query >{query}<")
raise

log.info(e, f"ignoring error '{e}' in query >{query}<")

return ctx.cursor().execute("SELECT 1 WHERE FALSE;")

Expand Down Expand Up @@ -151,18 +153,25 @@ def load_rules(ctx, postfix) -> List[str]:
return rules


INSERT_ALERTS_QUERY = f"""
INSERT INTO results.alerts (alert_time, event_time, alert)
SELECT PARSE_JSON(column1):ALERT_TIME
, PARSE_JSON(column1):EVENT_TIME
, PARSE_JSON(column1)
FROM VALUES {{values}}
"""


def sql_value_placeholders(n):
return ", ".join(["(%s)"] * n)


def insert_alerts(alerts, ctx=None):
if ctx is None:
ctx = CACHED_CONNECTION or connect()

ctx.cursor().execute(
(
f'INSERT INTO restuls.alerts (alert_time, event_time, alert) '
f'SELECT PARSE_JSON(column1):ALERT_TIME, PARSE_JSON(column1):EVENT_TIME, PARSE_JSON(column1) '
f'FROM values {", ".join(["(%s)"] * len(alerts))};'
),
alerts
)
query = INSERT_ALERTS_QUERY.format(values=sql_value_placeholders(len(alerts)))
return ctx.cursor().execute(query, alerts)


def insert_alerts_query_run(query_name, from_time_sql, to_time_sql='CURRENT_TIMESTAMP()', ctx=None):
Expand Down Expand Up @@ -202,16 +211,17 @@ def insert_alerts_query_run(query_name, from_time_sql, to_time_sql='CURRENT_TIME
IFNULL(
OBJECT_CONSTRUCT(*):IDENTITY,
OBJECT_CONSTRUCT(
'ENVIRONMENT', IFNULL(environment, PARSE_JSON('null')),
'OBJECT', IFNULL(object, PARSE_JSON('null')),
'TITLE', IFNULL(title, PARSE_JSON('null')),
'ALERT_TIME', IFNULL(alert_time, PARSE_JSON('null')),
'DESCRIPTION', IFNULL(description, PARSE_JSON('null')),
'EVENT_DATA', IFNULL(event_data, PARSE_JSON('null')),
'DETECTOR', IFNULL(detector, PARSE_JSON('null')),
'SEVERITY', IFNULL(severity, PARSE_JSON('null')),
'QUERY_ID', IFNULL(query_id, PARSE_JSON('null')),
'QUERY_NAME', IFNULL(query_name, PARSE_JSON('null'))
'ENVIRONMENT', IFNULL(OBJECT_CONSTRUCT(*):ENVIRONMENT, PARSE_JSON('null')),
'OBJECT', IFNULL(OBJECT_CONSTRUCT(*):OBJECT, PARSE_JSON('null')),
'OWNER', IFNULL(OBJECT_CONSTRUCT(*):OWNER, PARSE_JSON('null')),
'TITLE', IFNULL(OBJECT_CONSTRUCT(*):TITLE, PARSE_JSON('null')),
'ALERT_TIME', IFNULL(OBJECT_CONSTRUCT(*):ALERT_TIME, PARSE_JSON('null')),
'DESCRIPTION', IFNULL(OBJECT_CONSTRUCT(*):DESCRIPTION, PARSE_JSON('null')),
'EVENT_DATA', IFNULL(OBJECT_CONSTRUCT(*):EVENT_DATA, PARSE_JSON('null')),
'DETECTOR', IFNULL(OBJECT_CONSTRUCT(*):DETECTOR, PARSE_JSON('null')),
'SEVERITY', IFNULL(OBJECT_CONSTRUCT(*):SEVERITY, PARSE_JSON('null')),
'QUERY_ID', IFNULL(OBJECT_CONSTRUCT(*):QUERY_ID, PARSE_JSON('null')),
'QUERY_NAME', '{{query_name}}'
)
)
))
Expand All @@ -229,17 +239,11 @@ def insert_violations_query_run(query_name, ctx=None) -> Tuple[int, int]:

log.info(f"{query_name} processing...")
try:
try:
result = next(fetch(ctx, INSERT_VIOLATIONS_WITH_ID_QUERY.format(**locals())))
except Exception:
log.info('warning: missing STRING ID column in RESULTS.VIOLATIONS')
result = next(fetch(ctx, INSERT_VIOLATIONS_QUERY.format(**locals())))

num_rows_inserted = result['number of rows inserted']
log.info(f"{query_name} created {num_rows_inserted} rows, updated 0 rows.")
log.info(f"{query_name} done.")
return num_rows_inserted, 0

except Exception as e:
log.info(f"{query_name} run threw an exception:", e)
return 0, 0
result = next(fetch(INSERT_VIOLATIONS_WITH_ID_QUERY.format(**locals()), fix_errors=False))
except Exception:
log.info('warning: missing STRING ID column in RESULTS.VIOLATIONS')
result = next(fetch(INSERT_VIOLATIONS_QUERY.format(**locals()), fix_errors=False))

num_rows_inserted = result['number of rows inserted']
log.info(f"{query_name} created {num_rows_inserted} rows.")
return num_rows_inserted
18 changes: 9 additions & 9 deletions src/runners/helpers/dbconfig.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
from base64 import b64decode
import os
from os import environ

# database & account properties
REGION = os.environ.get('REGION', "us-west-2")
ACCOUNT = os.environ.get('SNOWFLAKE_ACCOUNT', '') + ('' if REGION == 'us-west-2' else f'.{REGION}')
REGION = environ.get('REGION', "us-west-2")
ACCOUNT = environ.get('SNOWFLAKE_ACCOUNT', '') + ('' if REGION == 'us-west-2' else f'.{REGION}')

USER = os.environ.get('SA_USER', "snowalert")
PRIVATE_KEY_PASSWORD = os.environ.get('PRIVATE_KEY_PASSWORD', '').encode('utf-8')
PRIVATE_KEY = b64decode(os.environ['PRIVATE_KEY']) if os.environ.get('PRIVATE_KEY') else None
USER = environ.get('SA_USER', "snowalert")
PRIVATE_KEY_PASSWORD = environ.get('PRIVATE_KEY_PASSWORD', '').encode('utf-8')
PRIVATE_KEY = b64decode(environ['PRIVATE_KEY']) if environ.get('PRIVATE_KEY') else None

ROLE = os.environ.get('SA_ROLE', "snowalert")
WAREHOUSE = os.environ.get('SA_WAREHOUSE', "snowalert")
DATABASE = os.environ.get('SA_DATABASE', "snowalert")
ROLE = environ.get('SA_ROLE', "snowalert")
WAREHOUSE = environ.get('SA_WAREHOUSE', "snowalert")
DATABASE = environ.get('SA_DATABASE', "snowalert")

# connection properties
TIMEOUT = 500
Loading

0 comments on commit c45c2ad

Please sign in to comment.