Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: batch or partition DB access and commits for user streak updation #3296

Merged
merged 9 commits into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion backend/oasst_backend/api/v1/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ async def interaction_tx(session: deps.Session):
ur = UserRepository(session, api_client)
task = await tm.handle_interaction(interaction)
if type(task) is protocol_schema.TaskDone:
ur.update_user_last_activity(user=pr.user)
ur.update_user_last_activity(user=pr.user, update_streak=True)
return task

try:
Expand Down
2 changes: 2 additions & 0 deletions backend/oasst_backend/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,8 @@ def validate_user_stats_intervals(cls, v: int):
DISCORD_API_KEY: str | None = None
DISCORD_CHANNEL_ID: str | None = None

USER_STREAK_BATCH_SIZE: int = 1000
andreaskoepf marked this conversation as resolved.
Show resolved Hide resolved

class Config:
env_file = ".env"
env_file_encoding = "utf-8"
Expand Down
53 changes: 16 additions & 37 deletions backend/oasst_backend/scheduled_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
from oasst_backend.models import ApiClient, Message
from oasst_backend.models.db_payload import MessagePayload
from oasst_backend.prompt_repository import PromptRepository
from oasst_backend.user_repository import User
from oasst_backend.utils.database_utils import db_lang_to_postgres_ts_lang, default_session_factory
from oasst_backend.utils.hugging_face import HfClassificationModel, HfEmbeddingModel, HfUrl, HuggingFaceAPI
from oasst_shared.utils import utcnow
from sqlalchemy import func
from sqlmodel import select

startup_time: datetime = utcnow()

Expand Down Expand Up @@ -94,44 +92,25 @@ def update_search_vectors(batch_size: int) -> None:

@shared_task(name="update_user_streak")
def update_user_streak() -> None:
# check if user has been active in the last 24h and update streak accordingly
logger.info("update_user_streak start...")
try:
with default_session_factory() as session:
current_time = utcnow()
timedelta = current_time - startup_time
if timedelta.days > 0:
# Update only greater than 24 hours . Do nothing
logger.info("Process timedelta greater than 24h")
statement = select(User)
result = session.exec(statement).all()
if result is not None:
for user in result:
last_activity_date = user.last_activity_date
streak_last_day_date = user.streak_last_day_date
# set NULL streak_days to 0
if user.streak_days is None:
user.streak_days = 0
# if the user had completed a task
if last_activity_date is not None:
lastactitvitydelta = current_time - last_activity_date
# if the user missed consecutive days of completing a task
# reset the streak_days to 0 and set streak_last_day_date to the current_time
if lastactitvitydelta.days > 1 or user.streak_days is None:
user.streak_days = 0
user.streak_last_day_date = current_time
# streak_last_day_date has a current timestamp in DB. Ideally should not be NULL.
if streak_last_day_date is not None:
streak_delta = current_time - streak_last_day_date
# if user completed tasks on consecutive days then increment the streak days
# update the streak_last_day_date to current time for the next calculation
if streak_delta.days > 0:
user.streak_days += 1
user.streak_last_day_date = current_time
session.add(user)
session.commit()

else:
logger.info("Not yet 24hours since the process started! ...")
logger.info("User streak end...")
logger.info("Process timedelta greater than 24h")

# Reset streak_days to 0 for users with more than one day of inactivity
reset_query = f"""
UPDATE "user"
SET streak_days = 0,
streak_last_day_date = '{current_time}'
andreaskoepf marked this conversation as resolved.
Show resolved Hide resolved
WHERE ('{current_time}' - last_activity_date) > interval '1 day'
OR streak_days IS NULL
OR last_activity_date IS NULL
"""
session.execute(reset_query)
session.commit()

logger.info("User streak reset successfully!")
except Exception as e:
logger.error(str(e))
21 changes: 19 additions & 2 deletions backend/oasst_backend/user_repository.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Optional
from uuid import UUID

from loguru import logger
from oasst_backend.config import settings
from oasst_backend.models import ApiClient, User
from oasst_backend.utils.database_utils import CommitMode, managed_tx_method
Expand Down Expand Up @@ -332,6 +333,22 @@ def query_users_ordered_by_display_name(
return qry.all()

@managed_tx_method(CommitMode.FLUSH)
def update_user_last_activity(self, user: User) -> None:
user.last_activity_date = utcnow()
def update_user_last_activity(self, user: User, update_streak: bool = False) -> None:
current_time = utcnow()
user.last_activity_date = current_time
streak_last_day_date = user.streak_last_day_date

if update_streak:
try:
if user.streak_last_day_date is None:
# this should only happen when the user is first created
user.streak_last_day_date = user.last_activity_date
else:
# if the user has not been active for more than a day increment it by 1
if current_time.days != streak_last_day_date.days:
user.streak_last_day_date = user.last_activity_date
user.streak_days += 1
except Exception as e:
logger.error(f"Error updating user streak for user {user.id}: {e}")

self.db.add(user)