diff --git a/discovery-provider/src/queries/notifications.py b/discovery-provider/src/queries/notifications.py index c3a115ca31c..c12a2e3b720 100644 --- a/discovery-provider/src/queries/notifications.py +++ b/discovery-provider/src/queries/notifications.py @@ -24,23 +24,20 @@ from src.models.milestone import Milestone from src.queries import response_name_constants as const from src.queries.get_prev_track_entries import get_prev_track_entries -from src.queries.get_sol_rewards_manager import ( - get_latest_cached_sol_rewards_manager_db, - get_latest_cached_sol_rewards_manager_program_tx, -) from src.queries.query_helpers import ( get_follower_count_dict, get_repost_counts, get_save_counts, ) -from src.tasks.index_listen_count_milestones import ( - LISTEN_COUNT_MILESTONE, - PROCESSED_LISTEN_MILESTONE, -) +from src.tasks.index_listen_count_milestones import LISTEN_COUNT_MILESTONE from src.utils.config import shared_config from src.utils.db_session import get_db_read_replica from src.utils.redis_connection import get_redis -from src.utils.redis_constants import latest_sol_aggregate_tips_slot_key +from src.utils.redis_constants import ( + latest_sol_aggregate_tips_slot_key, + latest_sol_listen_count_milestones_slot_key, + latest_sol_rewards_manager_slot_key, +) logger = logging.getLogger(__name__) bp = Blueprint("notifications", __name__) @@ -995,16 +992,13 @@ def notifications(): def get_max_slot(redis: Redis): - listen_milestone_slot = redis.get(PROCESSED_LISTEN_MILESTONE) + listen_milestone_slot = redis.get(latest_sol_listen_count_milestones_slot_key) if listen_milestone_slot: listen_milestone_slot = int(listen_milestone_slot) - rewards_manager_db_cache = get_latest_cached_sol_rewards_manager_db(redis) - tx_cache = get_latest_cached_sol_rewards_manager_program_tx(redis) - rewards_manager_slot = None - if tx_cache and rewards_manager_db_cache: - if tx_cache["slot"] != rewards_manager_db_cache["slot"]: - rewards_manager_slot = rewards_manager_db_cache["slot"] + rewards_manager_slot = redis.get(latest_sol_rewards_manager_slot_key) + if rewards_manager_slot: + rewards_manager_slot = int(rewards_manager_slot) supporter_rank_up_slot = redis.get(latest_sol_aggregate_tips_slot_key) if supporter_rank_up_slot: diff --git a/discovery-provider/src/tasks/index_listen_count_milestones.py b/discovery-provider/src/tasks/index_listen_count_milestones.py index c9230727c06..965108080fc 100644 --- a/discovery-provider/src/tasks/index_listen_count_milestones.py +++ b/discovery-provider/src/tasks/index_listen_count_milestones.py @@ -9,6 +9,10 @@ from src.models.models import AggregatePlays from src.tasks.celery_app import celery from src.utils.redis_cache import get_json_cached_key +from src.utils.redis_constants import ( + latest_sol_listen_count_milestones_slot_key, + latest_sol_plays_slot_key, +) from src.utils.session_manager import SessionManager logger = logging.getLogger(__name__) @@ -75,6 +79,7 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis): logger.info( "index_listen_count_milestones.py | Start calculating listen count milestones" ) + latest_plays_slot = redis.get(latest_sol_plays_slot_key) job_start = time.time() with db.scoped_session() as session: current_play_indexing = get_json_cached_key(redis, CURRENT_PLAY_INDEXING) @@ -144,6 +149,8 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis): f"index_listen_count_milestones.py | Finished calculating trending in {job_total} seconds", extra={"job": "index_listen_count_milestones", "total_time": job_total}, ) + if latest_plays_slot is not None: + redis.set(latest_sol_listen_count_milestones_slot_key, int(latest_plays_slot)) # ####### CELERY TASKS ####### # diff --git a/discovery-provider/src/tasks/index_rewards_manager.py b/discovery-provider/src/tasks/index_rewards_manager.py index 9628e13fdcf..9e885207d1a 100644 --- a/discovery-provider/src/tasks/index_rewards_manager.py +++ b/discovery-provider/src/tasks/index_rewards_manager.py @@ -41,6 +41,7 @@ from src.utils.redis_constants import ( latest_sol_rewards_manager_db_tx_key, latest_sol_rewards_manager_program_tx_key, + latest_sol_rewards_manager_slot_key, ) from src.utils.session_manager import SessionManager @@ -500,6 +501,7 @@ def process_transaction_signatures( "timestamp": last_tx["timestamp"], }, ) + return last_tx def process_solana_rewards_manager( @@ -514,6 +516,13 @@ def process_solana_rewards_manager( if not REWARDS_MANAGER_ACCOUNT: logger.error("index_rewards_manager.py | reward manager account missing") return + + # Get the latests slot available globally before fetching txs to keep track of indexing progress + try: + latest_global_slot = solana_client_manager.get_block_height() + except: + logger.error("index_rewards_manager.py | Failed to get block height") + # List of signatures that will be populated as we traverse recent operations transaction_signatures = get_transaction_signatures( solana_client_manager, @@ -525,9 +534,13 @@ def process_solana_rewards_manager( ) logger.info(f"index_rewards_manager.py | {transaction_signatures}") - process_transaction_signatures( + last_tx = process_transaction_signatures( solana_client_manager, db, redis, transaction_signatures ) + if last_tx: + redis.set(latest_sol_rewards_manager_slot_key, last_tx["slot"]) + elif latest_global_slot is not None: + redis.set(latest_sol_rewards_manager_slot_key, latest_global_slot) # ####### CELERY TASKS ####### # diff --git a/discovery-provider/src/tasks/index_solana_plays.py b/discovery-provider/src/tasks/index_solana_plays.py index 321563fe03a..80eb34f2db5 100644 --- a/discovery-provider/src/tasks/index_solana_plays.py +++ b/discovery-provider/src/tasks/index_solana_plays.py @@ -33,6 +33,7 @@ from src.utils.redis_constants import ( latest_sol_play_db_tx_key, latest_sol_play_program_tx_key, + latest_sol_plays_slot_key, ) TRACK_LISTEN_PROGRAM = shared_config["solana"]["track_listen_count_address"] @@ -531,6 +532,12 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi # Current batch page_count = 0 + # Get the latests slot available globally before fetching txs to keep track of indexing progress + try: + latest_global_slot = solana_client_manager.get_block_height() + except: + logger.error("index_solana_plays.py | Failed to get block height") + # Traverse recent records until an intersection is found with existing Plays table while not intersection_found: logger.info( @@ -622,6 +629,11 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi ) raise e + if last_tx: + redis.set(latest_sol_plays_slot_key, last_tx["slot"]) + elif latest_global_slot is not None: + redis.set(latest_sol_plays_slot_key, latest_global_slot) + @celery.task(name="index_solana_plays", bind=True) def index_solana_plays(self): diff --git a/discovery-provider/src/utils/redis_constants.py b/discovery-provider/src/utils/redis_constants.py index dc79fad9a33..fc8e52c3490 100644 --- a/discovery-provider/src/utils/redis_constants.py +++ b/discovery-provider/src/utils/redis_constants.py @@ -41,3 +41,6 @@ # Used to get the latest processed slot of each indexing task, using the global slots instead of the per-program slots latest_sol_user_bank_slot_key = "latest_sol_slot:user_bank" latest_sol_aggregate_tips_slot_key = "latest_sol_slot:aggregate_tips" +latest_sol_plays_slot_key = "latest_sol_slot:plays" +latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones" +latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager"