From da49b8a6167223e10fca3d047e633721e965911c Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Tue, 3 May 2022 21:01:34 +0000 Subject: [PATCH 1/3] Use global slot for solana notifications Use the global slot (block height) for tracking the solana indexing progress as it relates to notifications, otherwise notifications will be gated on the most recent transaction in each program --- .../src/queries/notifications.py | 26 +++++++------------ .../tasks/index_listen_count_milestones.py | 7 +++++ .../src/tasks/index_rewards_manager.py | 17 +++++++++++- .../src/tasks/index_solana_plays.py | 12 +++++++++ .../src/utils/redis_constants.py | 3 +++ 5 files changed, 48 insertions(+), 17 deletions(-) diff --git a/discovery-provider/src/queries/notifications.py b/discovery-provider/src/queries/notifications.py index c3a115ca31c..c12a2e3b720 100644 --- a/discovery-provider/src/queries/notifications.py +++ b/discovery-provider/src/queries/notifications.py @@ -24,23 +24,20 @@ from src.models.milestone import Milestone from src.queries import response_name_constants as const from src.queries.get_prev_track_entries import get_prev_track_entries -from src.queries.get_sol_rewards_manager import ( - get_latest_cached_sol_rewards_manager_db, - get_latest_cached_sol_rewards_manager_program_tx, -) from src.queries.query_helpers import ( get_follower_count_dict, get_repost_counts, get_save_counts, ) -from src.tasks.index_listen_count_milestones import ( - LISTEN_COUNT_MILESTONE, - PROCESSED_LISTEN_MILESTONE, -) +from src.tasks.index_listen_count_milestones import LISTEN_COUNT_MILESTONE from src.utils.config import shared_config from src.utils.db_session import get_db_read_replica from src.utils.redis_connection import get_redis -from src.utils.redis_constants import latest_sol_aggregate_tips_slot_key +from src.utils.redis_constants import ( + latest_sol_aggregate_tips_slot_key, + latest_sol_listen_count_milestones_slot_key, + latest_sol_rewards_manager_slot_key, +) logger = logging.getLogger(__name__) bp = Blueprint("notifications", __name__) @@ -995,16 +992,13 @@ def notifications(): def get_max_slot(redis: Redis): - listen_milestone_slot = redis.get(PROCESSED_LISTEN_MILESTONE) + listen_milestone_slot = redis.get(latest_sol_listen_count_milestones_slot_key) if listen_milestone_slot: listen_milestone_slot = int(listen_milestone_slot) - rewards_manager_db_cache = get_latest_cached_sol_rewards_manager_db(redis) - tx_cache = get_latest_cached_sol_rewards_manager_program_tx(redis) - rewards_manager_slot = None - if tx_cache and rewards_manager_db_cache: - if tx_cache["slot"] != rewards_manager_db_cache["slot"]: - rewards_manager_slot = rewards_manager_db_cache["slot"] + rewards_manager_slot = redis.get(latest_sol_rewards_manager_slot_key) + if rewards_manager_slot: + rewards_manager_slot = int(rewards_manager_slot) supporter_rank_up_slot = redis.get(latest_sol_aggregate_tips_slot_key) if supporter_rank_up_slot: diff --git a/discovery-provider/src/tasks/index_listen_count_milestones.py b/discovery-provider/src/tasks/index_listen_count_milestones.py index c9230727c06..965108080fc 100644 --- a/discovery-provider/src/tasks/index_listen_count_milestones.py +++ b/discovery-provider/src/tasks/index_listen_count_milestones.py @@ -9,6 +9,10 @@ from src.models.models import AggregatePlays from src.tasks.celery_app import celery from src.utils.redis_cache import get_json_cached_key +from src.utils.redis_constants import ( + latest_sol_listen_count_milestones_slot_key, + latest_sol_plays_slot_key, +) from src.utils.session_manager import SessionManager logger = logging.getLogger(__name__) @@ -75,6 +79,7 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis): logger.info( "index_listen_count_milestones.py | Start calculating listen count milestones" ) + latest_plays_slot = redis.get(latest_sol_plays_slot_key) job_start = time.time() with db.scoped_session() as session: current_play_indexing = get_json_cached_key(redis, CURRENT_PLAY_INDEXING) @@ -144,6 +149,8 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis): f"index_listen_count_milestones.py | Finished calculating trending in {job_total} seconds", extra={"job": "index_listen_count_milestones", "total_time": job_total}, ) + if latest_plays_slot is not None: + redis.set(latest_sol_listen_count_milestones_slot_key, int(latest_plays_slot)) # ####### CELERY TASKS ####### # diff --git a/discovery-provider/src/tasks/index_rewards_manager.py b/discovery-provider/src/tasks/index_rewards_manager.py index 9628e13fdcf..35fb7181814 100644 --- a/discovery-provider/src/tasks/index_rewards_manager.py +++ b/discovery-provider/src/tasks/index_rewards_manager.py @@ -41,6 +41,7 @@ from src.utils.redis_constants import ( latest_sol_rewards_manager_db_tx_key, latest_sol_rewards_manager_program_tx_key, + latest_sol_rewards_manager_slot_key, ) from src.utils.session_manager import SessionManager @@ -500,12 +501,15 @@ def process_transaction_signatures( "timestamp": last_tx["timestamp"], }, ) + return last_tx def process_solana_rewards_manager( solana_client_manager: SolanaClientManager, db: SessionManager, redis: Redis ): """Fetches the next set of reward manager transactions and updates the DB with Challenge Disbursements""" + # Get the latests slot available globally before fetching txs to keep track of indexing progress + if not is_valid_rewards_manager_program: logger.error( "index_rewards_manager.py | no valid reward manager program passed" @@ -514,6 +518,13 @@ def process_solana_rewards_manager( if not REWARDS_MANAGER_ACCOUNT: logger.error("index_rewards_manager.py | reward manager account missing") return + + # Get the latests slot available globally before fetching txs to keep track of indexing progress + try: + latest_global_slot = solana_client_manager.get_block_height() + except: + logger.error("index_rewards_manager.py | Failed to get block height") + # List of signatures that will be populated as we traverse recent operations transaction_signatures = get_transaction_signatures( solana_client_manager, @@ -525,9 +536,13 @@ def process_solana_rewards_manager( ) logger.info(f"index_rewards_manager.py | {transaction_signatures}") - process_transaction_signatures( + last_tx = process_transaction_signatures( solana_client_manager, db, redis, transaction_signatures ) + if latest_global_slot is not None: + redis.set(latest_sol_rewards_manager_slot_key, latest_global_slot) + elif last_tx: + redis.set(latest_sol_rewards_manager_slot_key, last_tx["slot"]) # ####### CELERY TASKS ####### # diff --git a/discovery-provider/src/tasks/index_solana_plays.py b/discovery-provider/src/tasks/index_solana_plays.py index 321563fe03a..a77e0caf0c6 100644 --- a/discovery-provider/src/tasks/index_solana_plays.py +++ b/discovery-provider/src/tasks/index_solana_plays.py @@ -33,6 +33,7 @@ from src.utils.redis_constants import ( latest_sol_play_db_tx_key, latest_sol_play_program_tx_key, + latest_sol_plays_slot_key, ) TRACK_LISTEN_PROGRAM = shared_config["solana"]["track_listen_count_address"] @@ -531,6 +532,12 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi # Current batch page_count = 0 + # Get the latests slot available globally before fetching txs to keep track of indexing progress + try: + latest_global_slot = solana_client_manager.get_block_height() + except: + logger.error("index_solana_plays.py | Failed to get block height") + # Traverse recent records until an intersection is found with existing Plays table while not intersection_found: logger.info( @@ -622,6 +629,11 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi ) raise e + if latest_global_slot is not None: + redis.set(latest_sol_plays_slot_key, latest_global_slot) + elif last_tx: + redis.set(latest_sol_plays_slot_key, last_tx["slot"]) + @celery.task(name="index_solana_plays", bind=True) def index_solana_plays(self): diff --git a/discovery-provider/src/utils/redis_constants.py b/discovery-provider/src/utils/redis_constants.py index dc79fad9a33..fc8e52c3490 100644 --- a/discovery-provider/src/utils/redis_constants.py +++ b/discovery-provider/src/utils/redis_constants.py @@ -41,3 +41,6 @@ # Used to get the latest processed slot of each indexing task, using the global slots instead of the per-program slots latest_sol_user_bank_slot_key = "latest_sol_slot:user_bank" latest_sol_aggregate_tips_slot_key = "latest_sol_slot:aggregate_tips" +latest_sol_plays_slot_key = "latest_sol_slot:plays" +latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones" +latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager" From 3b0256f7f94afc3acf68147acb40318b0d00667d Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Tue, 3 May 2022 21:06:12 +0000 Subject: [PATCH 2/3] Remove superfluous comment --- discovery-provider/src/tasks/index_rewards_manager.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/discovery-provider/src/tasks/index_rewards_manager.py b/discovery-provider/src/tasks/index_rewards_manager.py index 35fb7181814..95001298c18 100644 --- a/discovery-provider/src/tasks/index_rewards_manager.py +++ b/discovery-provider/src/tasks/index_rewards_manager.py @@ -508,8 +508,6 @@ def process_solana_rewards_manager( solana_client_manager: SolanaClientManager, db: SessionManager, redis: Redis ): """Fetches the next set of reward manager transactions and updates the DB with Challenge Disbursements""" - # Get the latests slot available globally before fetching txs to keep track of indexing progress - if not is_valid_rewards_manager_program: logger.error( "index_rewards_manager.py | no valid reward manager program passed" From 955bc2fb01ef1384f13770a0e7cabb0da2f0b1d7 Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 4 May 2022 21:41:53 +0000 Subject: [PATCH 3/3] Only use global slot when no transactions are processed --- discovery-provider/src/tasks/index_rewards_manager.py | 6 +++--- discovery-provider/src/tasks/index_solana_plays.py | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/discovery-provider/src/tasks/index_rewards_manager.py b/discovery-provider/src/tasks/index_rewards_manager.py index 95001298c18..9e885207d1a 100644 --- a/discovery-provider/src/tasks/index_rewards_manager.py +++ b/discovery-provider/src/tasks/index_rewards_manager.py @@ -537,10 +537,10 @@ def process_solana_rewards_manager( last_tx = process_transaction_signatures( solana_client_manager, db, redis, transaction_signatures ) - if latest_global_slot is not None: - redis.set(latest_sol_rewards_manager_slot_key, latest_global_slot) - elif last_tx: + if last_tx: redis.set(latest_sol_rewards_manager_slot_key, last_tx["slot"]) + elif latest_global_slot is not None: + redis.set(latest_sol_rewards_manager_slot_key, latest_global_slot) # ####### CELERY TASKS ####### # diff --git a/discovery-provider/src/tasks/index_solana_plays.py b/discovery-provider/src/tasks/index_solana_plays.py index a77e0caf0c6..80eb34f2db5 100644 --- a/discovery-provider/src/tasks/index_solana_plays.py +++ b/discovery-provider/src/tasks/index_solana_plays.py @@ -629,10 +629,10 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi ) raise e - if latest_global_slot is not None: - redis.set(latest_sol_plays_slot_key, latest_global_slot) - elif last_tx: + if last_tx: redis.set(latest_sol_plays_slot_key, last_tx["slot"]) + elif latest_global_slot is not None: + redis.set(latest_sol_plays_slot_key, latest_global_slot) @celery.task(name="index_solana_plays", bind=True)