Skip to content

Commit

Permalink
feat: shortfall simulation tables
Browse files Browse the repository at this point in the history
**Motivation:**

Adds tables which simulates a bad debt event.  

**Modifications:**

Adds downstream table with simulation results. Mods to build and
bigquery_io_manager to support this.
  • Loading branch information
scottincrypto committed Jul 10, 2023
1 parent b1ce703 commit 6476127
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 10 deletions.
6 changes: 5 additions & 1 deletion .github/workflows/branch_deployments.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
name: Serverless Branch Deployments
on:
pull_request:
types: [opened, synchronize, reopened, closed]
# types: [opened, synchronize, reopened, closed]
branches:
- 'do_no_run' # manually disabled
concurrency:
# Cancel in-progress runs on same branch
group: ${{ github.ref }}
Expand All @@ -10,6 +12,8 @@ env:
DAGSTER_CLOUD_URL: "http://llama.dagster.cloud"
DAGSTER_CLOUD_API_TOKEN: ${{ secrets.DAGSTER_CLOUD_API_TOKEN }}
# ENABLE_FAST_DEPLOYS: 'true'
PYTHON_VERSION: '3.10'
DAGSTER_CLOUD_FILE: 'dagster_cloud.yaml'

jobs:
dagster_cloud_default_deploy:
Expand Down
103 changes: 103 additions & 0 deletions aave_data/assets/protocol/protocol_data_warehouse.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
AssetIn
)
from icecream import ic
from scipy.interpolate import CubicSpline

from aave_data.resources.financials_config import * #pylint: disable=wildcard-import, unused-wildcard-import

Expand Down Expand Up @@ -470,6 +471,108 @@ def balancer_bpt_by_day(

return bpt


@asset(
compute_kind="python",
code_version="1",
io_manager_key = 'data_warehouse_io_manager',
ins={
"aave_token_liquidity_depth": AssetIn(key_prefix="protocol_data_lake"),
"sm_staked_value_by_day": AssetIn(),
}
)
def shortfall_simulation(
context,
aave_token_liquidity_depth,
sm_staked_value_by_day
)-> pd.DataFrame:
"""
Uses the Paraswap to get the liquidity depth of the AAVE/USDC pair on the Ethereum mainnet
Used for the Safety Module Shortfall Event analysis
This asset is not partitioned and appends to the existing table when run.
This asset is not idempotent as the aggregator data is ephemeral
Uses a non-idempoent IO manager to write to the warehouse
Args:
context: dagster context object
aave_token_liquidity_depth: liquidity depth of aave token from price aggregator
sm_staked_value_by_day: value of safety module holdings
Returns:
Outcomes of the bad debt event simulation in a dataframe
"""

# get the sm staked value and aggregate by day
sm_totals = sm_staked_value_by_day.groupby(['block_day']).sum(numeric_only=True).reset_index()
sm_totals['sm_coverage'] = 0.3 * sm_totals.staked_value_usd

# get the liqduidity depth and aggregate by day (pick one if there are multiple entries)
liq_depth = aave_token_liquidity_depth
liq_depth['block_day'] = liq_depth.fetch_time.dt.floor('D')
min_fetch = liq_depth[['fetch_time','block_day']].groupby(['block_day']).min().reset_index()
min_fetch.rename(columns={'fetch_time':'min_fetch'}, inplace=True)
liq_depth = liq_depth.merge(min_fetch, how='left')
liq_depth = liq_depth.query('fetch_time == min_fetch')
liq_depth = liq_depth[['block_day','from_amount_usd','to_amount_usd','price_impact']]

# calulate the to_amount in WETH/AAVE given from_amount
liq_depth['to_amount_aaveweth'] = liq_depth.from_amount_usd * (1- liq_depth.price_impact) * 0.9 + liq_depth.from_amount_usd * 0.1
liq_depth['to_amount_aave_only'] = liq_depth.from_amount_usd * (1 - liq_depth.price_impact) * 0.9

# filter the liq depth so that to_amount(from_amount) is an increasing function
unique_dates = liq_depth.block_day.unique()
liq_depth_filtered = pd.DataFrame()
for date in unique_dates:
temp_df = liq_depth[liq_depth['block_day'] == date].copy()
temp_df['prev'] = temp_df.to_amount_aaveweth.shift(1).fillna(0)
temp_df['diff'] = temp_df.to_amount_aaveweth - temp_df.prev
temp_df_filt = temp_df.query('diff >= 0')
liq_depth_filtered = pd.concat([liq_depth_filtered, temp_df_filt])
liq_depth_filtered.drop(columns=['prev','diff'], inplace=True)

# build the splines
splines = liq_depth_filtered.groupby(['block_day']).apply(lambda x: CubicSpline(x.from_amount_usd, x.to_amount_aaveweth)).reset_index()
splines.columns = ['block_day','from_given_to']
rev_splines = liq_depth_filtered.groupby(['block_day']).apply(lambda x: CubicSpline(x.to_amount_aaveweth, x.from_amount_usd)).reset_index()
rev_splines.columns = ['block_day','to_given_from']
price_impact_splines = liq_depth_filtered.groupby(['block_day']).apply(lambda x: CubicSpline(x.from_amount_usd, x.price_impact)).reset_index()
price_impact_splines.columns = ['block_day','impact_given_aave']

# join the splines and the sm totals
df = splines.merge(sm_totals, how='left')
df = df.merge(rev_splines, how='left')
df = df.merge(price_impact_splines, how='left')

# find the max debt capacity able to be cleared by the given liquidity curve
df['max_cap'] = df.apply(lambda x: x.from_given_to(x.sm_coverage), axis=1).astype(float)

# create the dataframe with the bad debt range to sweep over
# bad_debt_range = range(1000000,100000000,1000000)
bad_debt_range = [
*[10**6*i for i in range(1, 100)],
*[10**8*i for i in range(1,6)],
]
bad_debt_df = pd.DataFrame(bad_debt_range)
bad_debt_df.columns = ['bad_debt']
df = df.merge(bad_debt_df, how='cross')
df['naive_sales'] = df.apply(lambda x: x.from_given_to(x.bad_debt), axis=1).astype(float)
df['sale_required_for_bad_debt'] = df.apply(lambda x: x.to_given_from(x.bad_debt), axis=1).astype(float)
df['aave_sold_usd'] = df.bad_debt * 0.9
df['aave_price_impact'] = df.apply(lambda x: x.impact_given_aave(x.aave_sold_usd), axis=1).astype(float)
df = df[['block_day','staked_value_usd','sm_coverage','max_cap','bad_debt','naive_sales','sale_required_for_bad_debt','aave_sold_usd','aave_price_impact']]

df['overhang'] = np.where(df.bad_debt < df.max_cap, 0, df.bad_debt - df.max_cap)

df = standardise_types(df)

return df




if __name__ == "__main__":

pass
Expand Down
13 changes: 4 additions & 9 deletions aave_data/resources/bigquery_io_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,10 @@ def _get_select_statement(
time_window: Optional[Tuple[datetime, datetime]] = None,
partition_key: Optional[str] = None,
):
# col_str = ", ".join(columns) if columns else "*"
# datamart tables don't have dagster metadata
if "datamart" in dataset:
return f"SELECT * FROM {dataset}.{table} WHERE 1=1"

excluded_cols = "_dagster_partition_type, _dagster_partition_key, _dagster_partition_time, _dagster_load_timestamp"
if partition_type == "time_window":
return f"SELECT * EXCEPT ({excluded_cols}) FROM {dataset}.{table} {self._time_window_where_clause(time_window)}"
Expand All @@ -312,14 +315,6 @@ def _get_select_statement(
else:
return f"SELECT * EXCEPT ({excluded_cols}) FROM {dataset}.{table} WHERE 1=1"

if time_window:
return (
# f"""SELECT {col_str} FROM {self._config["database"]}.{dataset}.{table}\n"""
f"""SELECT {col_str} FROM {dataset}.{table}\n"""
+ self._time_window_where_clause(time_window)
)
else:
return f"""SELECT {col_str} FROM {dataset}.{table}"""

def _time_window_where_clause(self, time_window: Tuple[datetime, datetime]) -> str:
start_dt, end_dt = time_window
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"flipside==2.0.7", # API for accessing shroom Flipside Crypto data tables via SQL
# "web3==6.0.0", # installed in dagster_cloud_post_install.sh due to version clash with multicall
# "multicall==0.7.1" # installed in dagster_cloud_post_install.sh due to version clash with web3
"scipy"
],
extras_require={"dev": ["dagit", "pytest"]},
)

0 comments on commit 6476127

Please sign in to comment.