Skip to content

Commit

Permalink
Use global slot for solana notifications (#3020)
Browse files Browse the repository at this point in the history
* Use global slot for solana notifications

Use the global slot (block height) for tracking the solana indexing progress as it relates to notifications, otherwise notifications will be gated on the most recent transaction in each program

* Remove superfluous comment

* Only use global slot when no transactions are processed
  • Loading branch information
rickyrombo authored May 11, 2022
1 parent aa032d9 commit bc092fd
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 17 deletions.
26 changes: 10 additions & 16 deletions discovery-provider/src/queries/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,23 +25,20 @@
)
from src.queries import response_name_constants as const
from src.queries.get_prev_track_entries import get_prev_track_entries
from src.queries.get_sol_rewards_manager import (
get_latest_cached_sol_rewards_manager_db,
get_latest_cached_sol_rewards_manager_program_tx,
)
from src.queries.query_helpers import (
get_follower_count_dict,
get_repost_counts,
get_save_counts,
)
from src.tasks.index_listen_count_milestones import (
LISTEN_COUNT_MILESTONE,
PROCESSED_LISTEN_MILESTONE,
)
from src.tasks.index_listen_count_milestones import LISTEN_COUNT_MILESTONE
from src.utils.config import shared_config
from src.utils.db_session import get_db_read_replica
from src.utils.redis_connection import get_redis
from src.utils.redis_constants import latest_sol_aggregate_tips_slot_key
from src.utils.redis_constants import (
latest_sol_aggregate_tips_slot_key,
latest_sol_listen_count_milestones_slot_key,
latest_sol_rewards_manager_slot_key,
)

logger = logging.getLogger(__name__)
bp = Blueprint("notifications", __name__)
Expand Down Expand Up @@ -996,16 +993,13 @@ def notifications():


def get_max_slot(redis: Redis):
listen_milestone_slot = redis.get(PROCESSED_LISTEN_MILESTONE)
listen_milestone_slot = redis.get(latest_sol_listen_count_milestones_slot_key)
if listen_milestone_slot:
listen_milestone_slot = int(listen_milestone_slot)

rewards_manager_db_cache = get_latest_cached_sol_rewards_manager_db(redis)
tx_cache = get_latest_cached_sol_rewards_manager_program_tx(redis)
rewards_manager_slot = None
if tx_cache and rewards_manager_db_cache:
if tx_cache["slot"] != rewards_manager_db_cache["slot"]:
rewards_manager_slot = rewards_manager_db_cache["slot"]
rewards_manager_slot = redis.get(latest_sol_rewards_manager_slot_key)
if rewards_manager_slot:
rewards_manager_slot = int(rewards_manager_slot)

supporter_rank_up_slot = redis.get(latest_sol_aggregate_tips_slot_key)
if supporter_rank_up_slot:
Expand Down
7 changes: 7 additions & 0 deletions discovery-provider/src/tasks/index_listen_count_milestones.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from src.models.models import AggregatePlays
from src.tasks.celery_app import celery
from src.utils.redis_cache import get_json_cached_key
from src.utils.redis_constants import (
latest_sol_listen_count_milestones_slot_key,
latest_sol_plays_slot_key,
)
from src.utils.session_manager import SessionManager

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,6 +79,7 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis):
logger.info(
"index_listen_count_milestones.py | Start calculating listen count milestones"
)
latest_plays_slot = redis.get(latest_sol_plays_slot_key)
job_start = time.time()
with db.scoped_session() as session:
current_play_indexing = get_json_cached_key(redis, CURRENT_PLAY_INDEXING)
Expand Down Expand Up @@ -144,6 +149,8 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis):
f"index_listen_count_milestones.py | Finished calculating trending in {job_total} seconds",
extra={"job": "index_listen_count_milestones", "total_time": job_total},
)
if latest_plays_slot is not None:
redis.set(latest_sol_listen_count_milestones_slot_key, int(latest_plays_slot))


# ####### CELERY TASKS ####### #
Expand Down
15 changes: 14 additions & 1 deletion discovery-provider/src/tasks/index_rewards_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from src.utils.redis_constants import (
latest_sol_rewards_manager_db_tx_key,
latest_sol_rewards_manager_program_tx_key,
latest_sol_rewards_manager_slot_key,
)
from src.utils.session_manager import SessionManager

Expand Down Expand Up @@ -500,6 +501,7 @@ def process_transaction_signatures(
"timestamp": last_tx["timestamp"],
},
)
return last_tx


def process_solana_rewards_manager(
Expand All @@ -514,6 +516,13 @@ def process_solana_rewards_manager(
if not REWARDS_MANAGER_ACCOUNT:
logger.error("index_rewards_manager.py | reward manager account missing")
return

# Get the latests slot available globally before fetching txs to keep track of indexing progress
try:
latest_global_slot = solana_client_manager.get_block_height()
except:
logger.error("index_rewards_manager.py | Failed to get block height")

# List of signatures that will be populated as we traverse recent operations
transaction_signatures = get_transaction_signatures(
solana_client_manager,
Expand All @@ -525,9 +534,13 @@ def process_solana_rewards_manager(
)
logger.info(f"index_rewards_manager.py | {transaction_signatures}")

process_transaction_signatures(
last_tx = process_transaction_signatures(
solana_client_manager, db, redis, transaction_signatures
)
if last_tx:
redis.set(latest_sol_rewards_manager_slot_key, last_tx["slot"])
elif latest_global_slot is not None:
redis.set(latest_sol_rewards_manager_slot_key, latest_global_slot)


# ####### CELERY TASKS ####### #
Expand Down
12 changes: 12 additions & 0 deletions discovery-provider/src/tasks/index_solana_plays.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from src.utils.redis_constants import (
latest_sol_play_db_tx_key,
latest_sol_play_program_tx_key,
latest_sol_plays_slot_key,
)

TRACK_LISTEN_PROGRAM = shared_config["solana"]["track_listen_count_address"]
Expand Down Expand Up @@ -531,6 +532,12 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi
# Current batch
page_count = 0

# Get the latests slot available globally before fetching txs to keep track of indexing progress
try:
latest_global_slot = solana_client_manager.get_block_height()
except:
logger.error("index_solana_plays.py | Failed to get block height")

# Traverse recent records until an intersection is found with existing Plays table
while not intersection_found:
logger.info(
Expand Down Expand Up @@ -622,6 +629,11 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi
)
raise e

if last_tx:
redis.set(latest_sol_plays_slot_key, last_tx["slot"])
elif latest_global_slot is not None:
redis.set(latest_sol_plays_slot_key, latest_global_slot)


@celery.task(name="index_solana_plays", bind=True)
def index_solana_plays(self):
Expand Down
3 changes: 3 additions & 0 deletions discovery-provider/src/utils/redis_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@
# Used to get the latest processed slot of each indexing task, using the global slots instead of the per-program slots
latest_sol_user_bank_slot_key = "latest_sol_slot:user_bank"
latest_sol_aggregate_tips_slot_key = "latest_sol_slot:aggregate_tips"
latest_sol_plays_slot_key = "latest_sol_slot:plays"
latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones"
latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager"

0 comments on commit bc092fd

Please sign in to comment.