diff --git a/discovery-provider/alembic/env.py b/discovery-provider/alembic/env.py index 4cc4bafc84b..3e2d1f1ad9d 100644 --- a/discovery-provider/alembic/env.py +++ b/discovery-provider/alembic/env.py @@ -4,10 +4,10 @@ import re from alembic import context +from alembic.ddl.base import AddColumn, DropColumn, visit_add_column, visit_drop_column from sqlalchemy import engine_from_config, pool from sqlalchemy.ext.compiler import compiles from sqlalchemy.schema import CreateIndex, CreateTable, DropIndex, DropTable -from sqlalchemy.sql import ddl # add your model's MetaData object here # for 'autogenerate' support @@ -79,32 +79,39 @@ def run_migrations_online(): @compiles(CreateIndex) @compiles(CreateTable) +@compiles(AddColumn) def _add_if_not_exists(element, compiler, **kw): """Adds support for IF NOT EXISTS to CREATE TABLE and CREATE INDEX commands""" # Inspired by https://github.com/AudiusProject/audius-protocol/pull/2997 - if isinstance(element, ddl.CreateIndex): + if isinstance(element, CreateIndex): output = compiler.visit_create_index(element, **kw) - elif isinstance(element, ddl.CreateTable): + elif isinstance(element, CreateTable): output = compiler.visit_create_table(element, **kw) - if element.element.info.get("if_not_exists"): - output = re.sub( - "CREATE (TABLE|INDEX)", r"CREATE \g<1> IF NOT EXISTS", output, re.S + elif isinstance(element, AddColumn): + output = visit_add_column(element, compiler, **kw) + return re.sub( + "(CREATE|ADD) (TABLE|INDEX|COLUMN)", + r"\g<1> \g<2> IF NOT EXISTS", + output, + re.S, ) - return output @compiles(DropIndex) @compiles(DropTable) +@compiles(DropColumn) def _add_if_exists(element, compiler, **kw): """Adds support for IF EXISTS to DROP TABLE and DROP INDEX commands""" # Inspired by https://github.com/AudiusProject/audius-protocol/pull/2997 - if isinstance(element, ddl.DropIndex): + if isinstance(element, DropIndex): output = compiler.visit_drop_index(element, **kw) - elif isinstance(element, ddl.DropTable): + elif isinstance(element, DropTable): output = compiler.visit_drop_table(element, **kw) - if element.element.info.get("if_exists"): - output = re.sub("DROP (TABLE|INDEX)", r"DROP \g<1> IF EXISTS", output, re.S) - return output + elif isinstance(element, DropColumn): + output = visit_drop_column(element, compiler, **kw) + return re.sub( + "DROP (TABLE|INDEX|COLUMN)", r"DROP \g<1> IF EXISTS", output, re.S + ) if context.is_offline_mode(): diff --git a/discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py b/discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py new file mode 100644 index 00000000000..58439da08b4 --- /dev/null +++ b/discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py @@ -0,0 +1,35 @@ +"""Add supporter and supporting to aggregate user + +Revision ID: f11f9e83b28b +Revises: 35198266d709 +Create Date: 2022-05-11 08:10:57.138022 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f11f9e83b28b" +down_revision = "35198266d709" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "aggregate_user", + sa.Column("supporter_count", sa.Integer(), server_default="0", nullable=False), + ) + op.add_column( + "aggregate_user", + sa.Column("supporting_count", sa.Integer(), server_default="0", nullable=False), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("aggregate_user", "supporting_count") + op.drop_column("aggregate_user", "supporter_count") + # ### end Alembic commands ### diff --git a/discovery-provider/src/api/v1/models/users.py b/discovery-provider/src/api/v1/models/users.py index 2d6821a2aec..76e91dfad0d 100644 --- a/discovery-provider/src/api/v1/models/users.py +++ b/discovery-provider/src/api/v1/models/users.py @@ -45,6 +45,8 @@ "is_deactivated": fields.Boolean(required=True), "erc_wallet": fields.String(requred=True), "spl_wallet": fields.String(required=True), + "supporter_count": fields.Integer(required=True), + "supporting_count": fields.Integer(required=True), }, ) diff --git a/discovery-provider/src/models/models.py b/discovery-provider/src/models/models.py index b6808622f11..3c7b4c70e8d 100644 --- a/discovery-provider/src/models/models.py +++ b/discovery-provider/src/models/models.py @@ -1103,6 +1103,8 @@ class AggregateUser(Base): following_count = Column(Integer, nullable=False) repost_count = Column(Integer, nullable=False) track_save_count = Column(Integer, nullable=False) + supporter_count = Column(Integer, nullable=False, server_default="0") + supporting_count = Column(Integer, nullable=False, server_default="0") Index("aggregate_user_idx", "user_id", unique=True) diff --git a/discovery-provider/src/queries/query_helpers.py b/discovery-provider/src/queries/query_helpers.py index ea90974719a..b2118832646 100644 --- a/discovery-provider/src/queries/query_helpers.py +++ b/discovery-provider/src/queries/query_helpers.py @@ -21,8 +21,8 @@ SaveType, Track, User, + UserBankAccount, ) -from src.models.user_bank import UserBankAccount from src.queries import response_name_constants from src.queries.get_balances import get_balances from src.queries.get_unpopulated_users import get_unpopulated_users, set_users_in_cache @@ -110,7 +110,7 @@ def parse_sort_param(base_query, model, whitelist_sort_params): # given list of user ids and corresponding users, populates each user object with: -# track_count, playlist_count, album_count, follower_count, followee_count, repost_count +# track_count, playlist_count, album_count, follower_count, followee_count, repost_count, supporter_count, supporting_count # if current_user_id available, populates does_current_user_follow, followee_follows def populate_user_metadata( session, user_ids, users, current_user_id, with_track_save_count=False @@ -125,6 +125,8 @@ def populate_user_metadata( AggregateUser.following_count, AggregateUser.repost_count, AggregateUser.track_save_count, + AggregateUser.supporter_count, + AggregateUser.supporting_count, ) .filter(AggregateUser.user_id.in_(user_ids)) .all() @@ -136,7 +138,7 @@ def populate_user_metadata( ).filter(UserBankAccount.ethereum_address.in_(user["wallet"] for user in users)) user_banks_dict = dict(user_banks) - # build dict of user id --> track/playlist/album/follower/followee/repost/track save counts + # build dict of user id --> track/playlist/album/follower/followee/repost/track save/supporting/supporter counts count_dict = { user_id: { response_name_constants.track_count: track_count, @@ -146,6 +148,8 @@ def populate_user_metadata( response_name_constants.followee_count: following_count, response_name_constants.repost_count: repost_count, response_name_constants.track_save_count: track_save_count, + response_name_constants.supporter_count: supporter_count, + response_name_constants.supporting_count: supporting_count, } for ( user_id, @@ -156,6 +160,8 @@ def populate_user_metadata( following_count, repost_count, track_save_count, + supporter_count, + supporting_count, ) in aggregate_user } @@ -257,6 +263,12 @@ def populate_user_metadata( user[response_name_constants.track_save_count] = count_dict.get( user_id, {} ).get(response_name_constants.track_save_count, 0) + user[response_name_constants.supporter_count] = count_dict.get(user_id, {}).get( + response_name_constants.supporter_count, 0 + ) + user[response_name_constants.supporting_count] = count_dict.get( + user_id, {} + ).get(response_name_constants.supporting_count, 0) # current user specific user[ response_name_constants.does_current_user_follow @@ -282,7 +294,9 @@ def populate_user_metadata( user[response_name_constants.spl_wallet] = user_banks_dict.get( user["wallet"], None ) - user["does_follow_current_user"] = user_id in follows_current_user_set + user[response_name_constants.does_follow_current_user] = ( + user_id in follows_current_user_set + ) return users diff --git a/discovery-provider/src/queries/response_name_constants.py b/discovery-provider/src/queries/response_name_constants.py index bad03761888..7129f1a3767 100644 --- a/discovery-provider/src/queries/response_name_constants.py +++ b/discovery-provider/src/queries/response_name_constants.py @@ -19,11 +19,6 @@ # boolean - does the remix track author favorite the track has_remix_author_saved = "has_remix_author_saved" -# boolean - does current user follow given user -does_current_user_follow = "does_current_user_follow" -# integer - number of followees of current user that also follow given user -current_user_followee_follow_count = "current_user_followee_follow_count" - # user metadata user_id = "user_id" # integer - unique id of a user follower_count = "follower_count" # integer - total follower count of given user @@ -48,12 +43,20 @@ waudio_balance = "waudio_balance" erc_wallet = "erc_wallet" spl_wallet = "spl_wallet" +supporter_count = "supporter_count" +supporting_count = "supporting_count" # current user specific # boolean - does current user follow given user does_current_user_follow = "does_current_user_follow" +# boolean - does given user follow current user +does_follow_current_user = "does_follow_current_user" # integer - number of followees of current user that also follow given user current_user_followee_follow_count = "current_user_followee_follow_count" +# boolean - has current user tipped given user +does_current_user_support = "does_current_user_support" +# boolean - has given user tipped current user +does_support_current_user = "does_support_current_user" # feed # string - timestamp of relevant activity on underlying object, used for sorting diff --git a/discovery-provider/src/tasks/index_aggregate_tips.py b/discovery-provider/src/tasks/index_aggregate_tips.py index 0e0d6dda37d..432731e6d68 100644 --- a/discovery-provider/src/tasks/index_aggregate_tips.py +++ b/discovery-provider/src/tasks/index_aggregate_tips.py @@ -28,25 +28,83 @@ UPDATE_AGGREGATE_USER_TIPS_QUERY = """ +-- Update aggregate_tips table: INSERT INTO aggregate_user_tips ( sender_user_id , receiver_user_id , amount ) - SELECT - sender_user_id - , receiver_user_id - , SUM(amount) as amount +SELECT + sender_user_id + , receiver_user_id + , SUM(amount) as amount +FROM user_tips +WHERE + slot > :prev_slot AND + slot <= :current_slot +GROUP BY + sender_user_id + , receiver_user_id +ON CONFLICT (sender_user_id, receiver_user_id) +DO UPDATE + SET amount = EXCLUDED.amount + aggregate_user_tips.amount; + +-- Update aggregate_user supporter/supporting counts: +WITH recent_tips AS ( + SELECT + sender_user_id + , receiver_user_id FROM user_tips WHERE - slot > :prev_slot AND - slot <= :current_slot + slot > :prev_slot AND + slot <= :current_slot GROUP BY - sender_user_id - , receiver_user_id - ON CONFLICT (sender_user_id, receiver_user_id) - DO UPDATE - SET amount = EXCLUDED.amount + aggregate_user_tips.amount + sender_user_id + , receiver_user_id +) +, user_ids AS ( + SELECT sender_user_id AS user_id FROM recent_tips + UNION SELECT receiver_user_id AS user_id FROM recent_tips +), supporting AS ( + SELECT receiver_user_id AS user_id, COUNT(sender_user_id) AS total_supporting + FROM aggregate_user_tips + WHERE receiver_user_id IN (SELECT user_id FROM user_ids) + GROUP BY receiver_user_id +), supporters AS ( + SELECT sender_user_id AS user_id, COUNT(receiver_user_id) AS total_supporters + FROM aggregate_user_tips + WHERE sender_user_id IN (SELECT user_id FROM user_ids) + GROUP BY sender_user_id +) +INSERT INTO aggregate_user ( + user_id, + track_count, + playlist_count, + album_count, + follower_count, + following_count, + repost_count, + track_save_count, + supporter_count, + supporting_count +) +SELECT + supporting.user_id AS user_id, + 0 AS track_count, + 0 AS playlist_count, + 0 AS album_count, 0 AS follower_count, + 0 AS following_count, + 0 AS repost_count, + 0 AS track_save_count, + total_supporting AS supporting_count, + total_supporters AS supporter_count +FROM supporting +FULL OUTER JOIN supporters ON supporting.user_id = supporters.user_id +ON CONFLICT (user_id) +DO + UPDATE SET + supporting_count = EXCLUDED.supporting_count, + supporter_count = EXCLUDED.supporter_count """