Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use global slot for solana notifications #3020

Merged
merged 3 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 10 additions & 16 deletions discovery-provider/src/queries/notifications.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,23 +24,20 @@
from src.models.milestone import Milestone
from src.queries import response_name_constants as const
from src.queries.get_prev_track_entries import get_prev_track_entries
from src.queries.get_sol_rewards_manager import (
get_latest_cached_sol_rewards_manager_db,
get_latest_cached_sol_rewards_manager_program_tx,
)
from src.queries.query_helpers import (
get_follower_count_dict,
get_repost_counts,
get_save_counts,
)
from src.tasks.index_listen_count_milestones import (
LISTEN_COUNT_MILESTONE,
PROCESSED_LISTEN_MILESTONE,
)
from src.tasks.index_listen_count_milestones import LISTEN_COUNT_MILESTONE
from src.utils.config import shared_config
from src.utils.db_session import get_db_read_replica
from src.utils.redis_connection import get_redis
from src.utils.redis_constants import latest_sol_aggregate_tips_slot_key
from src.utils.redis_constants import (
latest_sol_aggregate_tips_slot_key,
latest_sol_listen_count_milestones_slot_key,
latest_sol_rewards_manager_slot_key,
)

logger = logging.getLogger(__name__)
bp = Blueprint("notifications", __name__)
Expand Down Expand Up @@ -995,16 +992,13 @@ def notifications():


def get_max_slot(redis: Redis):
listen_milestone_slot = redis.get(PROCESSED_LISTEN_MILESTONE)
listen_milestone_slot = redis.get(latest_sol_listen_count_milestones_slot_key)
if listen_milestone_slot:
listen_milestone_slot = int(listen_milestone_slot)

rewards_manager_db_cache = get_latest_cached_sol_rewards_manager_db(redis)
tx_cache = get_latest_cached_sol_rewards_manager_program_tx(redis)
rewards_manager_slot = None
if tx_cache and rewards_manager_db_cache:
if tx_cache["slot"] != rewards_manager_db_cache["slot"]:
rewards_manager_slot = rewards_manager_db_cache["slot"]
rewards_manager_slot = redis.get(latest_sol_rewards_manager_slot_key)
if rewards_manager_slot:
rewards_manager_slot = int(rewards_manager_slot)

supporter_rank_up_slot = redis.get(latest_sol_aggregate_tips_slot_key)
if supporter_rank_up_slot:
Expand Down
7 changes: 7 additions & 0 deletions discovery-provider/src/tasks/index_listen_count_milestones.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@
from src.models.models import AggregatePlays
from src.tasks.celery_app import celery
from src.utils.redis_cache import get_json_cached_key
from src.utils.redis_constants import (
latest_sol_listen_count_milestones_slot_key,
latest_sol_plays_slot_key,
)
from src.utils.session_manager import SessionManager

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -75,6 +79,7 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis):
logger.info(
"index_listen_count_milestones.py | Start calculating listen count milestones"
)
latest_plays_slot = redis.get(latest_sol_plays_slot_key)
job_start = time.time()
with db.scoped_session() as session:
current_play_indexing = get_json_cached_key(redis, CURRENT_PLAY_INDEXING)
Expand Down Expand Up @@ -144,6 +149,8 @@ def index_listen_count_milestones(db: SessionManager, redis: Redis):
f"index_listen_count_milestones.py | Finished calculating trending in {job_total} seconds",
extra={"job": "index_listen_count_milestones", "total_time": job_total},
)
if latest_plays_slot is not None:
redis.set(latest_sol_listen_count_milestones_slot_key, int(latest_plays_slot))


# ####### CELERY TASKS ####### #
Expand Down
15 changes: 14 additions & 1 deletion discovery-provider/src/tasks/index_rewards_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from src.utils.redis_constants import (
latest_sol_rewards_manager_db_tx_key,
latest_sol_rewards_manager_program_tx_key,
latest_sol_rewards_manager_slot_key,
)
from src.utils.session_manager import SessionManager

Expand Down Expand Up @@ -500,6 +501,7 @@ def process_transaction_signatures(
"timestamp": last_tx["timestamp"],
},
)
return last_tx


def process_solana_rewards_manager(
Expand All @@ -514,6 +516,13 @@ def process_solana_rewards_manager(
if not REWARDS_MANAGER_ACCOUNT:
logger.error("index_rewards_manager.py | reward manager account missing")
return

# Get the latests slot available globally before fetching txs to keep track of indexing progress
try:
latest_global_slot = solana_client_manager.get_block_height()
except:
logger.error("index_rewards_manager.py | Failed to get block height")

# List of signatures that will be populated as we traverse recent operations
transaction_signatures = get_transaction_signatures(
solana_client_manager,
Expand All @@ -525,9 +534,13 @@ def process_solana_rewards_manager(
)
logger.info(f"index_rewards_manager.py | {transaction_signatures}")

process_transaction_signatures(
last_tx = process_transaction_signatures(
solana_client_manager, db, redis, transaction_signatures
)
if last_tx:
redis.set(latest_sol_rewards_manager_slot_key, last_tx["slot"])
elif latest_global_slot is not None:
redis.set(latest_sol_rewards_manager_slot_key, latest_global_slot)


# ####### CELERY TASKS ####### #
Expand Down
12 changes: 12 additions & 0 deletions discovery-provider/src/tasks/index_solana_plays.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from src.utils.redis_constants import (
latest_sol_play_db_tx_key,
latest_sol_play_program_tx_key,
latest_sol_plays_slot_key,
)

TRACK_LISTEN_PROGRAM = shared_config["solana"]["track_listen_count_address"]
Expand Down Expand Up @@ -531,6 +532,12 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi
# Current batch
page_count = 0

# Get the latests slot available globally before fetching txs to keep track of indexing progress
try:
latest_global_slot = solana_client_manager.get_block_height()
except:
logger.error("index_solana_plays.py | Failed to get block height")

# Traverse recent records until an intersection is found with existing Plays table
while not intersection_found:
logger.info(
Expand Down Expand Up @@ -622,6 +629,11 @@ def process_solana_plays(solana_client_manager: SolanaClientManager, redis: Redi
)
raise e

if last_tx:
redis.set(latest_sol_plays_slot_key, last_tx["slot"])
elif latest_global_slot is not None:
redis.set(latest_sol_plays_slot_key, latest_global_slot)


@celery.task(name="index_solana_plays", bind=True)
def index_solana_plays(self):
Expand Down
3 changes: 3 additions & 0 deletions discovery-provider/src/utils/redis_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,6 @@
# Used to get the latest processed slot of each indexing task, using the global slots instead of the per-program slots
latest_sol_user_bank_slot_key = "latest_sol_slot:user_bank"
latest_sol_aggregate_tips_slot_key = "latest_sol_slot:aggregate_tips"
latest_sol_plays_slot_key = "latest_sol_slot:plays"
latest_sol_listen_count_milestones_slot_key = "latest_sol_slot:listen_count_milestones"
latest_sol_rewards_manager_slot_key = "latest_sol_slot:rewards_manager"