diff --git a/README.md b/README.md index 85defe80..d9e8ac2b 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# CoW Protocol: Solver Reimbursement & Rewards Distributor +# CoW Protocol: Solver Accounting [![Code style: black](https://img.shields.io/badge/code%20style-black-000000.svg)](https://github.com/psf/black) @@ -11,13 +11,19 @@ cp .env.sample .env <----- Copy your Dune and orderbook credentials here! Fill out your Dune credentials in the `.env` file. -Generate the solver-payouts with for the accounting period 7 days with today as end date). +Generate the solver-payouts with for the accounting period 7 days (with today as end date). ```shell python -m src.fetch.transfer_file ``` -For more advanced usage of this payout script see below. +To generate order data for the current month to upload to Dune run the following command. + +```shell +python -m src.data_sync.sync_data --sync-table order_data +``` + +For more advanced usage of these scripts see below. # Summary of Accounting Procedure @@ -150,6 +156,18 @@ docker run --pull=always -it --rm \ and (usually after about 30 seconds) find the transfer file written to your current working directory. -### Managing Dependencies +# Creating payment data for syncing + +The script `src/data_sync/sync_data.py` creates tables for syncing to dune. The scripts can be called with a table to sync, start and end times, and a flag for dropping old data. + +To create order rewards tables with data from `2024-12-30` to `2025-01-02` use +```shell +python -m src.data_sync.sync_data --sync_table order_data --start-time 2024-12-30 --end-time 2025-01-02 +``` +This will update (or create, if they do not exist yet) the tables `order_data_{NETWORK}_2024_12` and `order_data_{NETWORK}_2025_01`. + +The script requires the additional environment variable `ANALYTICS_DB_URL`. + +# Managing Dependencies Python libraries can be added to the `requirements.in` file. After this `pip-compile` or `python -m pip-compile` will update the `requirements.txt` for you (you may have to install the libry manually first). Warning: this might take a long time for large changes or when you run pip-compile for the first time. Running the command with the `-v` flag can help keep track of what's happening. diff --git a/requirements.in b/requirements.in index 7c3c2d4c..174689af 100644 --- a/requirements.in +++ b/requirements.in @@ -14,6 +14,7 @@ pandas==2.0.3 pandas-stubs==2.0.2.230605 numpy==1.26.4 pip-tools==7.4.1 +python-dateutil>=2.9.0.post0 black mypy pylint diff --git a/requirements.txt b/requirements.txt index c0407ffd..b1224b40 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ # -# This file is autogenerated by pip-compile with Python 3.10 +# This file is autogenerated by pip-compile with Python 3.12 # by the following command: # # pip-compile @@ -15,8 +15,6 @@ aiosignal==1.3.1 # via aiohttp astroid==3.2.4 # via pylint -async-timeout==4.0.3 - # via aiohttp attrs==24.2.0 # via # aiohttp @@ -106,8 +104,6 @@ eth-utils==4.1.1 # rlp # trie # web3 -exceptiongroup==1.2.2 - # via pytest frozenlist==1.4.1 # via # aiohttp @@ -205,6 +201,7 @@ pytest==8.3.2 # via -r requirements.in python-dateutil==2.9.0.post0 # via + # -r requirements.in # dune-client # pandas python-dotenv==1.0.1 @@ -253,14 +250,6 @@ sqlalchemy==1.4.53 # via -r requirements.in sqlalchemy-stubs==0.4 # via -r requirements.in -tomli==2.0.1 - # via - # black - # build - # mypy - # pip-tools - # pylint - # pytest tomlkit==0.13.2 # via pylint toolz==0.12.1 @@ -285,9 +274,6 @@ types-setuptools==73.0.0.20240822 # via dune-client typing-extensions==4.12.2 # via - # astroid - # black - # eth-rlp # eth-typing # mypy # sqlalchemy-stubs diff --git a/src/data_sync/common.py b/src/data_sync/common.py index e072c779..a04e192f 100644 --- a/src/data_sync/common.py +++ b/src/data_sync/common.py @@ -1,13 +1,78 @@ """Shared methods between both sync scripts.""" from datetime import datetime, timezone -from typing import List, Tuple + +from dateutil.relativedelta import relativedelta from web3 import Web3 + from src.logger import set_log +from src.models.block_range import BlockRange log = set_log(__name__) +def partition_time_range( + start_time: datetime, end_time: datetime +) -> list[tuple[datetime, datetime]]: + """Computes (list of) time ranges from input parameters. + If both times are from the same month, only [(start_time, end_time)] is returned. + Otherwise, the range is split into n pieces of the form [(start_time, start_of_month_2), + (start_of_month_2, start_of_month_3),..., (start_of_month_n, end_time)]. + """ + assert start_time < end_time, "start_time must be strictly smaller than end_time" + + # if there is just one month to consider + if end_time <= datetime(start_time.year, start_time.month, 1).replace( + tzinfo=timezone.utc + ) + relativedelta(months=1): + return [(start_time, end_time)] + + # if there are multiple months to consider + next_month_start_time = datetime(start_time.year, start_time.month, 1).replace( + tzinfo=timezone.utc + ) + relativedelta(months=1) + time_range_list = [(start_time, next_month_start_time)] + while end_time > next_month_start_time + relativedelta(months=1): + time_range_list.append( + (next_month_start_time, next_month_start_time + relativedelta(months=1)) + ) + next_month_start_time = next_month_start_time + relativedelta(months=1) + time_range_list.append((next_month_start_time, end_time)) + + return time_range_list + + +def compute_block_range( + start_time: datetime, end_time: datetime, node: Web3 +) -> BlockRange: + """Computes a block range from start and end time. + The convention for block ranges is to be inclusive, while the end time is exclusive. + Only finalized blocks are considered. + """ + latest_block = node.eth.get_block("finalized") + latest_block_time = datetime.fromtimestamp( + latest_block["timestamp"], tz=timezone.utc + ) + + assert ( + start_time < latest_block_time + ), "start time must be smaller than latest block time" + + start_block = find_block_with_timestamp(node, start_time.timestamp()) + if latest_block_time < end_time: + log.info( + f"Latest finalized block time {latest_block_time} is smaller than {end_time}." + "Using latest finalized block." + ) + end_block = int(latest_block["number"]) + else: + end_block = find_block_with_timestamp(node, end_time.timestamp()) - 1 + + assert start_block < end_block, "start block must be smaller than end block" + + return BlockRange(block_from=start_block, block_to=end_block) + + def find_block_with_timestamp(node: Web3, time_stamp: float) -> int: """ This implements binary search and returns the smallest block number @@ -40,77 +105,3 @@ def find_block_with_timestamp(node: Web3, time_stamp: float) -> int: # fallback in case correct block number hasn't been found # in that case, we will include some more blocks than necessary return mid_block_number + 200 - - -def compute_block_and_month_range( # pylint: disable=too-many-locals - node: Web3, recompute_previous_month: bool -) -> Tuple[List[Tuple[int, int]], List[str]]: - """ - This determines the block range and the relevant months - for which we will compute and upload data on Dune. - """ - # The function first a list of block ranges, followed by a list of - # # months. Block ranges are stored as (start_block, end_block) pairs, - # and are meant to be interpreted as closed intervals. - # Moreover, we assume that the job runs at least once every 24h - # Because of that, if it is the first day of month, we also - # compute the previous month's table just to be on the safe side - - latest_finalized_block = node.eth.get_block("finalized") - - current_month_end_block = int(latest_finalized_block["number"]) - current_month_end_timestamp = latest_finalized_block["timestamp"] - - current_month_end_datetime = datetime.fromtimestamp( - current_month_end_timestamp, tz=timezone.utc - ) - current_month_start_datetime = datetime( - current_month_end_datetime.year, current_month_end_datetime.month, 1, 00, 00 - ) - current_month_start_timestamp = current_month_start_datetime.replace( - tzinfo=timezone.utc - ).timestamp() - - current_month_start_block = find_block_with_timestamp( - node, current_month_start_timestamp - ) - - current_month = ( - f"{current_month_end_datetime.year}_{current_month_end_datetime.month}" - ) - ## in case the month is 1-9, we add a "0" prefix, so that we have a fixed-length representation - ## e.g., 2024-12, 2024-01 - if len(current_month) < 7: - current_month = current_month[:5] + "0" + current_month[5] - months_list = [current_month] - block_range = [(current_month_start_block, current_month_end_block)] - if current_month_end_datetime.day == 1 or recompute_previous_month: - if current_month_end_datetime.month == 1: - previous_month = f"{current_month_end_datetime.year - 1}_12" - previous_month_start_datetime = datetime( - current_month_end_datetime.year - 1, 12, 1, 00, 00 - ) - else: - previous_month = f"""{current_month_end_datetime.year}_ - {current_month_end_datetime.month - 1} - """ - if len(previous_month) < 7: - previous_month = previous_month[:5] + "0" + previous_month[5] - previous_month_start_datetime = datetime( - current_month_end_datetime.year, - current_month_end_datetime.month - 1, - 1, - 00, - 00, - ) - months_list.append(previous_month) - previous_month_start_timestamp = previous_month_start_datetime.replace( - tzinfo=timezone.utc - ).timestamp() - previous_month_start_block = find_block_with_timestamp( - node, previous_month_start_timestamp - ) - previous_month_end_block = current_month_start_block - 1 - block_range.append((previous_month_start_block, previous_month_end_block)) - - return block_range, months_list diff --git a/src/data_sync/sync_data.py b/src/data_sync/sync_data.py index a55a732b..b499e516 100644 --- a/src/data_sync/sync_data.py +++ b/src/data_sync/sync_data.py @@ -2,16 +2,22 @@ import argparse import asyncio +import datetime import os from dataclasses import dataclass + +from dateutil.relativedelta import relativedelta from dotenv import load_dotenv from web3 import Web3 -from src.fetch.orderbook import OrderbookFetcher, OrderbookEnv + +from src.fetch.orderbook import OrderbookFetcher from src.config import AccountingConfig, Network, web3 from src.logger import set_log from src.models.tables import SyncTable -from src.data_sync.common import compute_block_and_month_range -from src.models.block_range import BlockRange +from src.data_sync.common import ( + compute_block_range, + partition_time_range, +) log = set_log(__name__) @@ -22,6 +28,9 @@ class ScriptArgs: """Runtime arguments' parser/initializer""" sync_table: SyncTable + recreate_table: bool + start_time: datetime.datetime + end_time: datetime.datetime def __init__(self) -> None: parser = argparse.ArgumentParser("Dune Community Sources Sync") @@ -31,50 +40,85 @@ def __init__(self) -> None: required=True, choices=list(SyncTable), ) + parser.add_argument( + "--start-time", + type=datetime.datetime.fromisoformat, + default=None, + help=( + "Start of the time range for syncing (inclusive). If not set, it defaults to the " + "beginning of the current month." + ), + ) + parser.add_argument( + "--end-time", + type=datetime.datetime.fromisoformat, + default=None, + help=( + "End of the time range for syncing (inclusive). If not set, it defaults to the " + "beginning of the next month." + ), + ) + parser.add_argument( + "--recreate-table", + action="store_true", + help="If set, tables are dropped and recreated. Otherwise, tables are upserted.", + ) arguments, _ = parser.parse_known_args() self.sync_table: SyncTable = arguments.sync_table + self.recreate_table: bool = arguments.recreate_table + + # parse time arguments + current_time = datetime.datetime.now(datetime.timezone.utc) + if arguments.start_time is None: + # default start time is the first of the month + self.start_time = datetime.datetime( + current_time.year, current_time.month, 1 + ) + log.info(f"No start time set, using beginning of month {self.start_time}.") + else: + self.start_time = arguments.start_time + if arguments.end_time is None: + # default end time (exclusive) is the start of the next month + self.end_time = datetime.datetime( + current_time.year, current_time.month, 1 + ) + relativedelta(months=1) + log.info( + f"No start time set, using beginning of next month {self.end_time}." + ) + else: + self.end_time: datetime.datetime = arguments.end_time + self.start_time = self.start_time.replace(tzinfo=datetime.timezone.utc) + self.end_time = self.end_time.replace(tzinfo=datetime.timezone.utc) async def sync_data_to_db( # pylint: disable=too-many-arguments type_of_data: str, node: Web3, orderbook: OrderbookFetcher, - network: str, config: AccountingConfig, - recompute_previous_month: bool, + start_time: datetime.datetime, + end_time: datetime.datetime, + recreate_table: bool, ) -> None: - """ - Order/Batch data Sync Logic. The recompute_previous_month flag, when enabled, - forces a recomputation of the previous month. If it is set to False, previous month - is still recomputed when the current date is the first day of the current month. - """ - - block_range_list, months_list = compute_block_and_month_range( - node, recompute_previous_month - ) - # we note that the block range computed above is meant to be interpreted as - # a closed interval - for i, (start_block, end_block) in enumerate(block_range_list): - network_name = "ethereum" if network == "mainnet" else network - table_name = type_of_data + "_data_" + network_name + "_" + months_list[i] - block_range = BlockRange(block_from=start_block, block_to=end_block) - log.info( - f"About to process block range ({start_block}, {end_block}) for month {months_list[i]}" - ) + """Order and Batch data Sync Logic.""" + # partition time range into time ranges restricted to single months + time_range_list = partition_time_range(start_time, end_time) + for start_time_month, end_time_month in time_range_list: + month = start_time_month.strftime("%Y_%m") + network_name = config.dune_config.dune_blockchain + table_name = type_of_data + "_data_" + network_name + "_" + month + block_range = compute_block_range(start_time_month, end_time_month, node) + log.info(f"About to process {block_range}) for month {month}") if type_of_data == "batch": data = orderbook.get_batch_data(block_range, config) else: data = orderbook.get_order_data(block_range, config) log.info("SQL query successfully executed. About to update analytics table.") - data.to_sql( - table_name, - OrderbookFetcher.pg_engine(OrderbookEnv.ANALYTICS), - if_exists="replace", - index=False, - ) + orderbook.write_data(type_of_data, data, table_name, recreate_table) + log.info( - f"{type_of_data} data sync run completed successfully for month {months_list[i]}" + f"{type_of_data} data sync run completed successfully for month {month}." ) @@ -89,30 +133,25 @@ def sync_data() -> None: config = AccountingConfig.from_network(Network(os.environ["NETWORK"])) log.info(f"Network is set to: {network}") - if args.sync_table == SyncTable.BATCH_DATA: - asyncio.run( - sync_data_to_db( - "batch", - web3, - orderbook, - network, - config, - recompute_previous_month=False, - ) + match args.sync_table: + case SyncTable.BATCH_DATA: + type_of_data = "batch" + case SyncTable.ORDER_DATA: + type_of_data = "order" + case _: + raise ValueError(f"unsupported sync_table '{args.sync_table}'") + + asyncio.run( + sync_data_to_db( + type_of_data, # just pass the sync table directly + web3, + orderbook, + config, + args.start_time, + args.end_time, + args.recreate_table, ) - elif args.sync_table == SyncTable.ORDER_DATA: - asyncio.run( - sync_data_to_db( - "order", - web3, - orderbook, - network, - config, - recompute_previous_month=False, - ) - ) - else: - log.error(f"unsupported sync_table '{args.sync_table}'") + ) if __name__ == "__main__": diff --git a/src/fetch/orderbook.py b/src/fetch/orderbook.py index 3edd5138..676347a0 100644 --- a/src/fetch/orderbook.py +++ b/src/fetch/orderbook.py @@ -10,7 +10,7 @@ import pandas as pd from dotenv import load_dotenv -from pandas import DataFrame +from pandas import DataFrame, read_sql_table from sqlalchemy import create_engine from sqlalchemy.engine import Engine from src.config import AccountingConfig @@ -224,3 +224,50 @@ def get_order_data( ) start = start + size return pd.concat(res) + + @classmethod + def write_data( + cls, + type_of_data: str, + new_data: DataFrame, + table_name: str, + recreate_table: bool = False, + ) -> None: + """Write new data into database table. + Data is upserted: it is inserted if possible (i.e. it does not yet exist) and updated + otherwise.""" + if recreate_table: + log.info(f"(Re)creating table {table_name}.") + data = new_data + else: + # set index for upserting data depending on type of data + match type_of_data: + case "batch": + index_cols = ["environment", "auction_id"] + case "order": + index_cols = ["environment", "auction_id", "order_uid"] + case _: + raise ValueError(f"Unknown type {type_of_data}") + + # set index of new data + new_data = new_data.set_index(index_cols) + # try getting table data from database, just use new_data if table is not available + try: + data = read_sql_table( + table_name, + cls.pg_engine(OrderbookEnv.ANALYTICS), + index_col=index_cols, + ) + # upsert data (insert if possible, otherwise update) + log.info(f"Upserting into table {table_name}.") + data = pd.concat([data[~data.index.isin(new_data.index)], new_data]) + except ValueError: # this catches the case of a missing table + data = new_data + log.info(f"Creating new table {table_name}.") + data = data.reset_index() + data.to_sql( + table_name, + cls.pg_engine(OrderbookEnv.ANALYTICS), + if_exists="replace", + index=False, + )