|
| 1 | +""" |
| 2 | +~40,000 distinct ip addresses recorded "downloading a file" in Q1 of 2024 |
| 3 | +Using http://ip-api.com/batch API, I was able to extract the information of all ~40000 IP addresses. |
| 4 | +This information contains city, country, region, etc which is then stored back into snowflake |
| 5 | +for analysis |
| 6 | +
|
| 7 | +This eventually could be an Airflow DAG, so not going spending too much time on it |
| 8 | +""" |
| 9 | + |
| 10 | +import time |
| 11 | + |
| 12 | +import backoff |
| 13 | +from dotenv import dotenv_values |
| 14 | +import numpy as np |
| 15 | +import pandas as pd |
| 16 | +import requests |
| 17 | +import snowflake.connector |
| 18 | +from snowflake.connector.pandas_tools import write_pandas |
| 19 | + |
| 20 | +# Track the start and end dates for extracting IP adresses to know when to execute the code |
| 21 | +RECORD_START_DATE = "2024-03-01" |
| 22 | +RECORD_END_DATE = "2024-05-25" |
| 23 | + |
| 24 | + |
| 25 | +@backoff.on_exception( |
| 26 | + backoff.expo, requests.exceptions.RequestException, max_tries=8, jitter=None |
| 27 | +) |
| 28 | +def get_ip_info(ip_list: list) -> dict: |
| 29 | + """Get IP information from ip-api.com: http://ip-api.com/batch |
| 30 | +
|
| 31 | + Args: |
| 32 | + ip_list (list): list of IP addresses |
| 33 | +
|
| 34 | + Returns: |
| 35 | + dict: IP information like city, country, region, lat, long, asn, etc |
| 36 | + """ |
| 37 | + ip_info_response = requests.post("http://ip-api.com/batch", json=ip_list) |
| 38 | + return ip_info_response.json() |
| 39 | + |
| 40 | + |
| 41 | +def batch_get_ip_info(unique_ips: pd.DataFrame, batch_size: int = 100) -> pd.DataFrame: |
| 42 | + """ |
| 43 | + Retrieves a batch of unique IP addresse information in batches. |
| 44 | + The function sleeps for 2.5 seconds between each batch to avoid exceeding the API rate limit. |
| 45 | +
|
| 46 | + Args: |
| 47 | + unique_ips (pd.DataFrame): A DataFrame containing the unique IP addresses. |
| 48 | + batch_size (int): The size of each batch. Max batch size is 100. |
| 49 | +
|
| 50 | + Returns: |
| 51 | + pd.DataFrame: A dataframe of IP information |
| 52 | + """ |
| 53 | + result: list = [] |
| 54 | + for batch_number, batch_df in unique_ips.groupby( |
| 55 | + np.arange(len(unique_ips)) // batch_size |
| 56 | + ): |
| 57 | + print(batch_number) |
| 58 | + ip_list: list = get_ip_info(batch_df["UNIQUE_IPS"].to_list()) |
| 59 | + # API rate limit of 15 per minute |
| 60 | + # Add in sleep to not get throttled |
| 61 | + time.sleep(2.5) |
| 62 | + result.extend(ip_list) |
| 63 | + ip_info_df = pd.DataFrame(result) |
| 64 | + return ip_info_df |
| 65 | + |
| 66 | + |
| 67 | +def main(): |
| 68 | + """Main function""" |
| 69 | + config = dotenv_values("../.env") |
| 70 | + ctx = snowflake.connector.connect( |
| 71 | + user=config["user"], |
| 72 | + password=config["password"], |
| 73 | + account=config["snowflake_account"], |
| 74 | + database="synapse_data_warehouse", |
| 75 | + schema="synapse", |
| 76 | + role="SYSADMIN", |
| 77 | + warehouse="compute_xsmall", |
| 78 | + ) |
| 79 | + cs = ctx.cursor() |
| 80 | + |
| 81 | + query = f""" |
| 82 | + select |
| 83 | + distinct x_forwarded_for as unique_ips |
| 84 | + from |
| 85 | + synapse_data_warehouse.synapse.processedaccess |
| 86 | + where |
| 87 | + x_forwarded_for is not null and |
| 88 | + x_forwarded_for not in (select ip from sage.audit.extracted_ip_info) and |
| 89 | + record_date BETWEEN DATE('{RECORD_START_DATE}') and DATE('{RECORD_END_DATE}'); |
| 90 | + """ |
| 91 | + cs.execute(query) |
| 92 | + unique_ips = cs.fetch_pandas_all() |
| 93 | + ip_info_df = batch_get_ip_info(unique_ips=unique_ips) |
| 94 | + |
| 95 | + succeeded_ip_info = ip_info_df[ip_info_df["status"] == "success"] |
| 96 | + # These columns do not add value in a snowflake query |
| 97 | + del succeeded_ip_info["status"] |
| 98 | + del succeeded_ip_info["message"] |
| 99 | + |
| 100 | + # Renaming columns to be more descriptive or ignoring SQL key words |
| 101 | + succeeded_ip_info.rename(columns={"query": "ip", "as": "asn"}, inplace=True) |
| 102 | + succeeded_ip_info.to_csv("ip_info.csv", index=False) |
| 103 | + |
| 104 | + write_pandas( |
| 105 | + conn=ctx, |
| 106 | + df=succeeded_ip_info, |
| 107 | + table_name="extracted_ip_info", |
| 108 | + database="SAGE", |
| 109 | + schema="AUDIT", |
| 110 | + auto_create_table=True, |
| 111 | + # overwrite=True, |
| 112 | + quote_identifiers=False, |
| 113 | + ) |
| 114 | + ctx.close() |
| 115 | + |
| 116 | + |
| 117 | +if __name__ == "__main__": |
| 118 | + main() |
0 commit comments