diff --git a/.github/workflows/branch_deployments.yml b/.github/workflows/branch_deployments.yml index 6f1802d..b5abffd 100644 --- a/.github/workflows/branch_deployments.yml +++ b/.github/workflows/branch_deployments.yml @@ -10,6 +10,8 @@ env: DAGSTER_CLOUD_URL: "http://llama.dagster.cloud" DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }} # ENABLE_FAST_DEPLOYS: 'true' + PYTHON_VERSION: '3.10' + DAGSTER_CLOUD_FILE: 'dagster_cloud.yaml' jobs: dagster_cloud_default_deploy: diff --git a/aave_data/assets/protocol/protocol_data_warehouse.py b/aave_data/assets/protocol/protocol_data_warehouse.py index 77d2e2d..233e249 100644 --- a/aave_data/assets/protocol/protocol_data_warehouse.py +++ b/aave_data/assets/protocol/protocol_data_warehouse.py @@ -15,6 +15,7 @@ AssetIn ) from icecream import ic +from scipy.interpolate import CubicSpline from aave_data.resources.financials_config import * #pylint: disable=wildcard-import, unused-wildcard-import @@ -470,6 +471,108 @@ def balancer_bpt_by_day( return bpt + +@asset( + compute_kind="python", + code_version="1", + io_manager_key = 'data_warehouse_io_manager', + ins={ + "aave_token_liquidity_depth": AssetIn(key_prefix="protocol_data_lake"), + "sm_staked_value_by_day": AssetIn(), + } +) +def shortfall_simulation( + context, + aave_token_liquidity_depth, + sm_staked_value_by_day + )-> pd.DataFrame: + """ + Uses the Paraswap to get the liquidity depth of the AAVE/USDC pair on the Ethereum mainnet + + Used for the Safety Module Shortfall Event analysis + + This asset is not partitioned and appends to the existing table when run. + This asset is not idempotent as the aggregator data is ephemeral + + Uses a non-idempoent IO manager to write to the warehouse + + Args: + context: dagster context object + aave_token_liquidity_depth: liquidity depth of aave token from price aggregator + sm_staked_value_by_day: value of safety module holdings + + Returns: + Outcomes of the bad debt event simulation in a dataframe + """ + + # get the sm staked value and aggregate by day + sm_totals = sm_staked_value_by_day.groupby(['block_day']).sum(numeric_only=True).reset_index() + sm_totals['sm_coverage'] = 0.3 * sm_totals.staked_value_usd + + # get the liqduidity depth and aggregate by day (pick one if there are multiple entries) + liq_depth = aave_token_liquidity_depth + liq_depth['block_day'] = liq_depth.fetch_time.dt.floor('D') + min_fetch = liq_depth[['fetch_time','block_day']].groupby(['block_day']).min().reset_index() + min_fetch.rename(columns={'fetch_time':'min_fetch'}, inplace=True) + liq_depth = liq_depth.merge(min_fetch, how='left') + liq_depth = liq_depth.query('fetch_time == min_fetch') + liq_depth = liq_depth[['block_day','from_amount_usd','to_amount_usd','price_impact']] + + # calulate the to_amount in WETH/AAVE given from_amount + liq_depth['to_amount_aaveweth'] = liq_depth.from_amount_usd * (1- liq_depth.price_impact) * 0.9 + liq_depth.from_amount_usd * 0.1 + liq_depth['to_amount_aave_only'] = liq_depth.from_amount_usd * (1 - liq_depth.price_impact) * 0.9 + + # filter the liq depth so that to_amount(from_amount) is an increasing function + unique_dates = liq_depth.block_day.unique() + liq_depth_filtered = pd.DataFrame() + for date in unique_dates: + temp_df = liq_depth[liq_depth['block_day'] == date].copy() + temp_df['prev'] = temp_df.to_amount_aaveweth.shift(1).fillna(0) + temp_df['diff'] = temp_df.to_amount_aaveweth - temp_df.prev + temp_df_filt = temp_df.query('diff >= 0') + liq_depth_filtered = pd.concat([liq_depth_filtered, temp_df_filt]) + liq_depth_filtered.drop(columns=['prev','diff'], inplace=True) + + # build the splines + splines = liq_depth_filtered.groupby(['block_day']).apply(lambda x: CubicSpline(x.from_amount_usd, x.to_amount_aaveweth)).reset_index() + splines.columns = ['block_day','from_given_to'] + rev_splines = liq_depth_filtered.groupby(['block_day']).apply(lambda x: CubicSpline(x.to_amount_aaveweth, x.from_amount_usd)).reset_index() + rev_splines.columns = ['block_day','to_given_from'] + price_impact_splines = liq_depth_filtered.groupby(['block_day']).apply(lambda x: CubicSpline(x.from_amount_usd, x.price_impact)).reset_index() + price_impact_splines.columns = ['block_day','impact_given_aave'] + + # join the splines and the sm totals + df = splines.merge(sm_totals, how='left') + df = df.merge(rev_splines, how='left') + df = df.merge(price_impact_splines, how='left') + + # find the max debt capacity able to be cleared by the given liquidity curve + df['max_cap'] = df.apply(lambda x: x.from_given_to(x.sm_coverage), axis=1).astype(float) + + # create the dataframe with the bad debt range to sweep over + # bad_debt_range = range(1000000,100000000,1000000) + bad_debt_range = [ + *[10**6*i for i in range(1, 100)], + *[10**8*i for i in range(1,6)], + ] + bad_debt_df = pd.DataFrame(bad_debt_range) + bad_debt_df.columns = ['bad_debt'] + df = df.merge(bad_debt_df, how='cross') + df['naive_sales'] = df.apply(lambda x: x.from_given_to(x.bad_debt), axis=1).astype(float) + df['sale_required_for_bad_debt'] = df.apply(lambda x: x.to_given_from(x.bad_debt), axis=1).astype(float) + df['aave_sold_usd'] = df.bad_debt * 0.9 + df['aave_price_impact'] = df.apply(lambda x: x.impact_given_aave(x.aave_sold_usd), axis=1).astype(float) + df = df[['block_day','staked_value_usd','sm_coverage','max_cap','bad_debt','naive_sales','sale_required_for_bad_debt','aave_sold_usd','aave_price_impact']] + + df['overhang'] = np.where(df.bad_debt < df.max_cap, 0, df.bad_debt - df.max_cap) + + df = standardise_types(df) + + return df + + + + if __name__ == "__main__": pass diff --git a/aave_data/resources/bigquery_io_manager.py b/aave_data/resources/bigquery_io_manager.py index ac20093..bc615b4 100644 --- a/aave_data/resources/bigquery_io_manager.py +++ b/aave_data/resources/bigquery_io_manager.py @@ -303,7 +303,10 @@ def _get_select_statement( time_window: Optional[Tuple[datetime, datetime]] = None, partition_key: Optional[str] = None, ): - # col_str = ", ".join(columns) if columns else "*" + # datamart tables don't have dagster metadata + if "datamart" in dataset: + return f"SELECT * FROM {dataset}.{table} WHERE 1=1" + excluded_cols = "_dagster_partition_type, _dagster_partition_key, _dagster_partition_time, _dagster_load_timestamp" if partition_type == "time_window": return f"SELECT * EXCEPT ({excluded_cols}) FROM {dataset}.{table} {self._time_window_where_clause(time_window)}" @@ -312,14 +315,6 @@ def _get_select_statement( else: return f"SELECT * EXCEPT ({excluded_cols}) FROM {dataset}.{table} WHERE 1=1" - if time_window: - return ( - # f"""SELECT {col_str} FROM {self._config["database"]}.{dataset}.{table}\n""" - f"""SELECT {col_str} FROM {dataset}.{table}\n""" - + self._time_window_where_clause(time_window) - ) - else: - return f"""SELECT {col_str} FROM {dataset}.{table}""" def _time_window_where_clause(self, time_window: Tuple[datetime, datetime]) -> str: start_dt, end_dt = time_window diff --git a/setup.py b/setup.py index b2e7f7e..03e26fb 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,7 @@ "flipside==2.0.7", # API for accessing shroom Flipside Crypto data tables via SQL # "web3==6.0.0", # installed in dagster_cloud_post_install.sh due to version clash with multicall # "multicall==0.7.1" # installed in dagster_cloud_post_install.sh due to version clash with web3 + "scipy" ], extras_require={"dev": ["dagit", "pytest"]}, )