From 51810e2ab3d92f98bf9dd78f6ac4ab4d6c98e1d6 Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 11 May 2022 09:55:22 +0000 Subject: [PATCH 1/6] Get supporter/supporting count on the user object --- ...83b28b_add_supporter_and_supporting_to_.py | 35 ++++++++ discovery-provider/src/api/v1/models/users.py | 2 + discovery-provider/src/models/models.py | 2 + .../src/queries/query_helpers.py | 14 +++- .../src/queries/response_name_constants.py | 2 + .../src/tasks/index_aggregate_tips.py | 80 ++++++++++++++++--- 6 files changed, 123 insertions(+), 12 deletions(-) create mode 100644 discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py diff --git a/discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py b/discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py new file mode 100644 index 00000000000..58439da08b4 --- /dev/null +++ b/discovery-provider/alembic/versions/f11f9e83b28b_add_supporter_and_supporting_to_.py @@ -0,0 +1,35 @@ +"""Add supporter and supporting to aggregate user + +Revision ID: f11f9e83b28b +Revises: 35198266d709 +Create Date: 2022-05-11 08:10:57.138022 + +""" +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "f11f9e83b28b" +down_revision = "35198266d709" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column( + "aggregate_user", + sa.Column("supporter_count", sa.Integer(), server_default="0", nullable=False), + ) + op.add_column( + "aggregate_user", + sa.Column("supporting_count", sa.Integer(), server_default="0", nullable=False), + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_column("aggregate_user", "supporting_count") + op.drop_column("aggregate_user", "supporter_count") + # ### end Alembic commands ### diff --git a/discovery-provider/src/api/v1/models/users.py b/discovery-provider/src/api/v1/models/users.py index 2d6821a2aec..76e91dfad0d 100644 --- a/discovery-provider/src/api/v1/models/users.py +++ b/discovery-provider/src/api/v1/models/users.py @@ -45,6 +45,8 @@ "is_deactivated": fields.Boolean(required=True), "erc_wallet": fields.String(requred=True), "spl_wallet": fields.String(required=True), + "supporter_count": fields.Integer(required=True), + "supporting_count": fields.Integer(required=True), }, ) diff --git a/discovery-provider/src/models/models.py b/discovery-provider/src/models/models.py index b6808622f11..3c7b4c70e8d 100644 --- a/discovery-provider/src/models/models.py +++ b/discovery-provider/src/models/models.py @@ -1103,6 +1103,8 @@ class AggregateUser(Base): following_count = Column(Integer, nullable=False) repost_count = Column(Integer, nullable=False) track_save_count = Column(Integer, nullable=False) + supporter_count = Column(Integer, nullable=False, server_default="0") + supporting_count = Column(Integer, nullable=False, server_default="0") Index("aggregate_user_idx", "user_id", unique=True) diff --git a/discovery-provider/src/queries/query_helpers.py b/discovery-provider/src/queries/query_helpers.py index ea90974719a..f80d18067ae 100644 --- a/discovery-provider/src/queries/query_helpers.py +++ b/discovery-provider/src/queries/query_helpers.py @@ -21,8 +21,8 @@ SaveType, Track, User, + UserBankAccount, ) -from src.models.user_bank import UserBankAccount from src.queries import response_name_constants from src.queries.get_balances import get_balances from src.queries.get_unpopulated_users import get_unpopulated_users, set_users_in_cache @@ -125,6 +125,8 @@ def populate_user_metadata( AggregateUser.following_count, AggregateUser.repost_count, AggregateUser.track_save_count, + AggregateUser.supporter_count, + AggregateUser.supporting_count, ) .filter(AggregateUser.user_id.in_(user_ids)) .all() @@ -146,6 +148,8 @@ def populate_user_metadata( response_name_constants.followee_count: following_count, response_name_constants.repost_count: repost_count, response_name_constants.track_save_count: track_save_count, + response_name_constants.supporter_count: supporter_count, + response_name_constants.supporting_count: supporting_count, } for ( user_id, @@ -156,6 +160,8 @@ def populate_user_metadata( following_count, repost_count, track_save_count, + supporter_count, + supporting_count, ) in aggregate_user } @@ -257,6 +263,12 @@ def populate_user_metadata( user[response_name_constants.track_save_count] = count_dict.get( user_id, {} ).get(response_name_constants.track_save_count, 0) + user[response_name_constants.supporter_count] = count_dict.get(user_id, {}).get( + response_name_constants.supporter_count, 0 + ) + user[response_name_constants.supporting_count] = count_dict.get( + user_id, {} + ).get(response_name_constants.supporting_count, 0) # current user specific user[ response_name_constants.does_current_user_follow diff --git a/discovery-provider/src/queries/response_name_constants.py b/discovery-provider/src/queries/response_name_constants.py index bad03761888..188d35657e4 100644 --- a/discovery-provider/src/queries/response_name_constants.py +++ b/discovery-provider/src/queries/response_name_constants.py @@ -48,6 +48,8 @@ waudio_balance = "waudio_balance" erc_wallet = "erc_wallet" spl_wallet = "spl_wallet" +supporter_count = "supporter_count" +supporting_count = "supporting_count" # current user specific # boolean - does current user follow given user diff --git a/discovery-provider/src/tasks/index_aggregate_tips.py b/discovery-provider/src/tasks/index_aggregate_tips.py index 0e0d6dda37d..432731e6d68 100644 --- a/discovery-provider/src/tasks/index_aggregate_tips.py +++ b/discovery-provider/src/tasks/index_aggregate_tips.py @@ -28,25 +28,83 @@ UPDATE_AGGREGATE_USER_TIPS_QUERY = """ +-- Update aggregate_tips table: INSERT INTO aggregate_user_tips ( sender_user_id , receiver_user_id , amount ) - SELECT - sender_user_id - , receiver_user_id - , SUM(amount) as amount +SELECT + sender_user_id + , receiver_user_id + , SUM(amount) as amount +FROM user_tips +WHERE + slot > :prev_slot AND + slot <= :current_slot +GROUP BY + sender_user_id + , receiver_user_id +ON CONFLICT (sender_user_id, receiver_user_id) +DO UPDATE + SET amount = EXCLUDED.amount + aggregate_user_tips.amount; + +-- Update aggregate_user supporter/supporting counts: +WITH recent_tips AS ( + SELECT + sender_user_id + , receiver_user_id FROM user_tips WHERE - slot > :prev_slot AND - slot <= :current_slot + slot > :prev_slot AND + slot <= :current_slot GROUP BY - sender_user_id - , receiver_user_id - ON CONFLICT (sender_user_id, receiver_user_id) - DO UPDATE - SET amount = EXCLUDED.amount + aggregate_user_tips.amount + sender_user_id + , receiver_user_id +) +, user_ids AS ( + SELECT sender_user_id AS user_id FROM recent_tips + UNION SELECT receiver_user_id AS user_id FROM recent_tips +), supporting AS ( + SELECT receiver_user_id AS user_id, COUNT(sender_user_id) AS total_supporting + FROM aggregate_user_tips + WHERE receiver_user_id IN (SELECT user_id FROM user_ids) + GROUP BY receiver_user_id +), supporters AS ( + SELECT sender_user_id AS user_id, COUNT(receiver_user_id) AS total_supporters + FROM aggregate_user_tips + WHERE sender_user_id IN (SELECT user_id FROM user_ids) + GROUP BY sender_user_id +) +INSERT INTO aggregate_user ( + user_id, + track_count, + playlist_count, + album_count, + follower_count, + following_count, + repost_count, + track_save_count, + supporter_count, + supporting_count +) +SELECT + supporting.user_id AS user_id, + 0 AS track_count, + 0 AS playlist_count, + 0 AS album_count, 0 AS follower_count, + 0 AS following_count, + 0 AS repost_count, + 0 AS track_save_count, + total_supporting AS supporting_count, + total_supporters AS supporter_count +FROM supporting +FULL OUTER JOIN supporters ON supporting.user_id = supporters.user_id +ON CONFLICT (user_id) +DO + UPDATE SET + supporting_count = EXCLUDED.supporting_count, + supporter_count = EXCLUDED.supporter_count """ From b645d4eb3baafd26354ee12377edead6cf299647 Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 11 May 2022 09:59:32 +0000 Subject: [PATCH 2/6] update comments --- discovery-provider/src/queries/query_helpers.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/discovery-provider/src/queries/query_helpers.py b/discovery-provider/src/queries/query_helpers.py index f80d18067ae..765b86dd027 100644 --- a/discovery-provider/src/queries/query_helpers.py +++ b/discovery-provider/src/queries/query_helpers.py @@ -110,7 +110,7 @@ def parse_sort_param(base_query, model, whitelist_sort_params): # given list of user ids and corresponding users, populates each user object with: -# track_count, playlist_count, album_count, follower_count, followee_count, repost_count +# track_count, playlist_count, album_count, follower_count, followee_count, repost_count, supporter_count, supporting_count # if current_user_id available, populates does_current_user_follow, followee_follows def populate_user_metadata( session, user_ids, users, current_user_id, with_track_save_count=False @@ -138,7 +138,7 @@ def populate_user_metadata( ).filter(UserBankAccount.ethereum_address.in_(user["wallet"] for user in users)) user_banks_dict = dict(user_banks) - # build dict of user id --> track/playlist/album/follower/followee/repost/track save counts + # build dict of user id --> track/playlist/album/follower/followee/repost/track save/supporting/supporter counts count_dict = { user_id: { response_name_constants.track_count: track_count, From ac676b361c2a7aedd883b819c602ca39c87607da Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 11 May 2022 10:25:50 +0000 Subject: [PATCH 3/6] Add does_current_user_support and does_support_current_user to user --- discovery-provider/src/api/v1/models/users.py | 2 ++ .../src/queries/query_helpers.py | 36 ++++++++++++++++++- .../src/queries/response_name_constants.py | 11 +++--- 3 files changed, 43 insertions(+), 6 deletions(-) diff --git a/discovery-provider/src/api/v1/models/users.py b/discovery-provider/src/api/v1/models/users.py index 76e91dfad0d..b30868de03e 100644 --- a/discovery-provider/src/api/v1/models/users.py +++ b/discovery-provider/src/api/v1/models/users.py @@ -75,6 +75,8 @@ "metadata_multihash": fields.String, "has_collectibles": fields.Boolean(required=True), "playlist_library": fields.Nested(playlist_library, allow_null=True), + "does_support_current_user": fields.Boolean(required=True), + "does_current_user_support": fields.Boolean(required=True), }, ) diff --git a/discovery-provider/src/queries/query_helpers.py b/discovery-provider/src/queries/query_helpers.py index 765b86dd027..1762792fae0 100644 --- a/discovery-provider/src/queries/query_helpers.py +++ b/discovery-provider/src/queries/query_helpers.py @@ -12,6 +12,7 @@ AggregatePlays, AggregateTrack, AggregateUser, + AggregateUserTips, Follow, Playlist, Remix, @@ -179,6 +180,8 @@ def populate_user_metadata( track_blocknumber_dict = dict(track_blocknumbers) follows_current_user_set = set() + supports_current_user_set = set() + current_user_supports_set = set() current_user_followed_user_ids = {} current_user_followee_follow_count_dict = {} if current_user_id: @@ -233,6 +236,29 @@ def populate_user_metadata( current_user_followee_follow_counts ) + supports_current_user_rows = ( + session.query( + AggregateUserTips.sender_user_id, AggregateUserTips.receiver_user_id + ) + .filter( + AggregateUserTips.receiver_user_id == current_user_id, + AggregateUserTips.sender_user_id.in_(user_ids), + ) + .all() + ) + current_user_supports_rows = ( + session.query( + AggregateUserTips.receiver_user_id, AggregateUserTips.sender_user_id + ) + .filter( + AggregateUserTips.sender_user_id == current_user_id, + AggregateUserTips.receiver_user_id.in_(user_ids), + ) + .all() + ) + supports_current_user_set = set([row[0] for row in supports_current_user_rows]) + current_user_supports_set = set([row[0] for row in current_user_supports_rows]) + balance_dict = get_balances(session, redis, user_ids) for user in users: @@ -294,7 +320,15 @@ def populate_user_metadata( user[response_name_constants.spl_wallet] = user_banks_dict.get( user["wallet"], None ) - user["does_follow_current_user"] = user_id in follows_current_user_set + user[response_name_constants.does_follow_current_user] = ( + user_id in follows_current_user_set + ) + user[response_name_constants.does_support_current_user] = ( + user_id in supports_current_user_set + ) + user[response_name_constants.does_current_user_support] = ( + user_id in current_user_supports_set + ) return users diff --git a/discovery-provider/src/queries/response_name_constants.py b/discovery-provider/src/queries/response_name_constants.py index 188d35657e4..7129f1a3767 100644 --- a/discovery-provider/src/queries/response_name_constants.py +++ b/discovery-provider/src/queries/response_name_constants.py @@ -19,11 +19,6 @@ # boolean - does the remix track author favorite the track has_remix_author_saved = "has_remix_author_saved" -# boolean - does current user follow given user -does_current_user_follow = "does_current_user_follow" -# integer - number of followees of current user that also follow given user -current_user_followee_follow_count = "current_user_followee_follow_count" - # user metadata user_id = "user_id" # integer - unique id of a user follower_count = "follower_count" # integer - total follower count of given user @@ -54,8 +49,14 @@ # current user specific # boolean - does current user follow given user does_current_user_follow = "does_current_user_follow" +# boolean - does given user follow current user +does_follow_current_user = "does_follow_current_user" # integer - number of followees of current user that also follow given user current_user_followee_follow_count = "current_user_followee_follow_count" +# boolean - has current user tipped given user +does_current_user_support = "does_current_user_support" +# boolean - has given user tipped current user +does_support_current_user = "does_support_current_user" # feed # string - timestamp of relevant activity on underlying object, used for sorting From 40cb3479b6437e08ca4264886d4d3567daded3f1 Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 11 May 2022 19:16:18 +0000 Subject: [PATCH 4/6] Make column adding/dropping idempotent --- discovery-provider/alembic/env.py | 29 ++++++++++++++++++++--------- 1 file changed, 20 insertions(+), 9 deletions(-) diff --git a/discovery-provider/alembic/env.py b/discovery-provider/alembic/env.py index 4cc4bafc84b..ab05ddfb302 100644 --- a/discovery-provider/alembic/env.py +++ b/discovery-provider/alembic/env.py @@ -4,10 +4,10 @@ import re from alembic import context +from alembic.ddl.base import AddColumn, DropColumn, visit_add_column, visit_drop_column from sqlalchemy import engine_from_config, pool from sqlalchemy.ext.compiler import compiles from sqlalchemy.schema import CreateIndex, CreateTable, DropIndex, DropTable -from sqlalchemy.sql import ddl # add your model's MetaData object here # for 'autogenerate' support @@ -79,31 +79,42 @@ def run_migrations_online(): @compiles(CreateIndex) @compiles(CreateTable) +@compiles(AddColumn) def _add_if_not_exists(element, compiler, **kw): """Adds support for IF NOT EXISTS to CREATE TABLE and CREATE INDEX commands""" # Inspired by https://github.com/AudiusProject/audius-protocol/pull/2997 - if isinstance(element, ddl.CreateIndex): + if isinstance(element, CreateIndex): output = compiler.visit_create_index(element, **kw) - elif isinstance(element, ddl.CreateTable): + elif isinstance(element, CreateTable): output = compiler.visit_create_table(element, **kw) - if element.element.info.get("if_not_exists"): + elif isinstance(element, AddColumn): + output = visit_add_column(element, compiler, **kw) + if not hasattr(element, "element") or element.element.info.get("if_not_exists"): output = re.sub( - "CREATE (TABLE|INDEX)", r"CREATE \g<1> IF NOT EXISTS", output, re.S + "(CREATE|ADD) (TABLE|INDEX|COLUMN)", + r"\g<1> \g<2> IF NOT EXISTS", + output, + re.S, ) return output @compiles(DropIndex) @compiles(DropTable) +@compiles(DropColumn) def _add_if_exists(element, compiler, **kw): """Adds support for IF EXISTS to DROP TABLE and DROP INDEX commands""" # Inspired by https://github.com/AudiusProject/audius-protocol/pull/2997 - if isinstance(element, ddl.DropIndex): + if isinstance(element, DropIndex): output = compiler.visit_drop_index(element, **kw) - elif isinstance(element, ddl.DropTable): + elif isinstance(element, DropTable): output = compiler.visit_drop_table(element, **kw) - if element.element.info.get("if_exists"): - output = re.sub("DROP (TABLE|INDEX)", r"DROP \g<1> IF EXISTS", output, re.S) + elif isinstance(element, DropColumn): + output = visit_drop_column(element, compiler, **kw) + if not hasattr(element, "element") or element.element.info.get("if_exists"): + output = re.sub( + "DROP (TABLE|INDEX|COLUMN)", r"DROP \g<1> IF EXISTS", output, re.S + ) return output From 14839ecc879d92659a8d17ea835e4161dc3f02a8 Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 11 May 2022 19:38:43 +0000 Subject: [PATCH 5/6] Make everything idempotent by default --- discovery-provider/alembic/env.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/discovery-provider/alembic/env.py b/discovery-provider/alembic/env.py index ab05ddfb302..3e2d1f1ad9d 100644 --- a/discovery-provider/alembic/env.py +++ b/discovery-provider/alembic/env.py @@ -89,14 +89,12 @@ def _add_if_not_exists(element, compiler, **kw): output = compiler.visit_create_table(element, **kw) elif isinstance(element, AddColumn): output = visit_add_column(element, compiler, **kw) - if not hasattr(element, "element") or element.element.info.get("if_not_exists"): - output = re.sub( + return re.sub( "(CREATE|ADD) (TABLE|INDEX|COLUMN)", r"\g<1> \g<2> IF NOT EXISTS", output, re.S, ) - return output @compiles(DropIndex) @@ -111,11 +109,9 @@ def _add_if_exists(element, compiler, **kw): output = compiler.visit_drop_table(element, **kw) elif isinstance(element, DropColumn): output = visit_drop_column(element, compiler, **kw) - if not hasattr(element, "element") or element.element.info.get("if_exists"): - output = re.sub( - "DROP (TABLE|INDEX|COLUMN)", r"DROP \g<1> IF EXISTS", output, re.S - ) - return output + return re.sub( + "DROP (TABLE|INDEX|COLUMN)", r"DROP \g<1> IF EXISTS", output, re.S + ) if context.is_offline_mode(): From 6b69a071f380c02c88ed65bdd8b124d1ad057663 Mon Sep 17 00:00:00 2001 From: Marcus Pasell Date: Wed, 11 May 2022 21:19:55 +0000 Subject: [PATCH 6/6] Remove does_current_user_support/does_support_current_user --- discovery-provider/src/api/v1/models/users.py | 2 -- .../src/queries/query_helpers.py | 32 ------------------- 2 files changed, 34 deletions(-) diff --git a/discovery-provider/src/api/v1/models/users.py b/discovery-provider/src/api/v1/models/users.py index b30868de03e..76e91dfad0d 100644 --- a/discovery-provider/src/api/v1/models/users.py +++ b/discovery-provider/src/api/v1/models/users.py @@ -75,8 +75,6 @@ "metadata_multihash": fields.String, "has_collectibles": fields.Boolean(required=True), "playlist_library": fields.Nested(playlist_library, allow_null=True), - "does_support_current_user": fields.Boolean(required=True), - "does_current_user_support": fields.Boolean(required=True), }, ) diff --git a/discovery-provider/src/queries/query_helpers.py b/discovery-provider/src/queries/query_helpers.py index 1762792fae0..b2118832646 100644 --- a/discovery-provider/src/queries/query_helpers.py +++ b/discovery-provider/src/queries/query_helpers.py @@ -12,7 +12,6 @@ AggregatePlays, AggregateTrack, AggregateUser, - AggregateUserTips, Follow, Playlist, Remix, @@ -180,8 +179,6 @@ def populate_user_metadata( track_blocknumber_dict = dict(track_blocknumbers) follows_current_user_set = set() - supports_current_user_set = set() - current_user_supports_set = set() current_user_followed_user_ids = {} current_user_followee_follow_count_dict = {} if current_user_id: @@ -236,29 +233,6 @@ def populate_user_metadata( current_user_followee_follow_counts ) - supports_current_user_rows = ( - session.query( - AggregateUserTips.sender_user_id, AggregateUserTips.receiver_user_id - ) - .filter( - AggregateUserTips.receiver_user_id == current_user_id, - AggregateUserTips.sender_user_id.in_(user_ids), - ) - .all() - ) - current_user_supports_rows = ( - session.query( - AggregateUserTips.receiver_user_id, AggregateUserTips.sender_user_id - ) - .filter( - AggregateUserTips.sender_user_id == current_user_id, - AggregateUserTips.receiver_user_id.in_(user_ids), - ) - .all() - ) - supports_current_user_set = set([row[0] for row in supports_current_user_rows]) - current_user_supports_set = set([row[0] for row in current_user_supports_rows]) - balance_dict = get_balances(session, redis, user_ids) for user in users: @@ -323,12 +297,6 @@ def populate_user_metadata( user[response_name_constants.does_follow_current_user] = ( user_id in follows_current_user_set ) - user[response_name_constants.does_support_current_user] = ( - user_id in supports_current_user_set - ) - user[response_name_constants.does_current_user_support] = ( - user_id in current_user_supports_set - ) return users