Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add supporter metadata to user #3077

Merged
merged 6 commits into from
May 11, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 20 additions & 9 deletions discovery-provider/alembic/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
import re

from alembic import context
from alembic.ddl.base import AddColumn, DropColumn, visit_add_column, visit_drop_column
from sqlalchemy import engine_from_config, pool
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.schema import CreateIndex, CreateTable, DropIndex, DropTable
from sqlalchemy.sql import ddl

# add your model's MetaData object here
# for 'autogenerate' support
Expand Down Expand Up @@ -79,31 +79,42 @@ def run_migrations_online():

@compiles(CreateIndex)
@compiles(CreateTable)
@compiles(AddColumn)
def _add_if_not_exists(element, compiler, **kw):
"""Adds support for IF NOT EXISTS to CREATE TABLE and CREATE INDEX commands"""
# Inspired by https://github.com/AudiusProject/audius-protocol/pull/2997
if isinstance(element, ddl.CreateIndex):
if isinstance(element, CreateIndex):
output = compiler.visit_create_index(element, **kw)
elif isinstance(element, ddl.CreateTable):
elif isinstance(element, CreateTable):
output = compiler.visit_create_table(element, **kw)
if element.element.info.get("if_not_exists"):
elif isinstance(element, AddColumn):
output = visit_add_column(element, compiler, **kw)
if not hasattr(element, "element") or element.element.info.get("if_not_exists"):
output = re.sub(
"CREATE (TABLE|INDEX)", r"CREATE \g<1> IF NOT EXISTS", output, re.S
"(CREATE|ADD) (TABLE|INDEX|COLUMN)",
r"\g<1> \g<2> IF NOT EXISTS",
output,
re.S,
)
return output


@compiles(DropIndex)
@compiles(DropTable)
@compiles(DropColumn)
def _add_if_exists(element, compiler, **kw):
"""Adds support for IF EXISTS to DROP TABLE and DROP INDEX commands"""
# Inspired by https://github.com/AudiusProject/audius-protocol/pull/2997
if isinstance(element, ddl.DropIndex):
if isinstance(element, DropIndex):
output = compiler.visit_drop_index(element, **kw)
elif isinstance(element, ddl.DropTable):
elif isinstance(element, DropTable):
output = compiler.visit_drop_table(element, **kw)
if element.element.info.get("if_exists"):
output = re.sub("DROP (TABLE|INDEX)", r"DROP \g<1> IF EXISTS", output, re.S)
elif isinstance(element, DropColumn):
output = visit_drop_column(element, compiler, **kw)
if not hasattr(element, "element") or element.element.info.get("if_exists"):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking of making add/drop columns idempotent by default, as alembic doesn't give us an element with kwargs to work with here. In which case, it might make sense to make create/drop index/table idempotent by default too? CC: @isaacsolo / @jowlee ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that makes sense to me! this is pretty cool. are you suggesting we use ORM for DDL stuff instead of raw SQL?

Copy link
Contributor Author

@rickyrombo rickyrombo May 11, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ultimate goal is to write declarative models then use alembic revision --autogenerate and have migrations made for us!

output = re.sub(
"DROP (TABLE|INDEX|COLUMN)", r"DROP \g<1> IF EXISTS", output, re.S
)
return output


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
"""Add supporter and supporting to aggregate user

Revision ID: f11f9e83b28b
Revises: 35198266d709
Create Date: 2022-05-11 08:10:57.138022

"""
import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "f11f9e83b28b"
down_revision = "35198266d709"
branch_labels = None
depends_on = None


def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"aggregate_user",
sa.Column("supporter_count", sa.Integer(), server_default="0", nullable=False),
)
op.add_column(
"aggregate_user",
sa.Column("supporting_count", sa.Integer(), server_default="0", nullable=False),
)
# ### end Alembic commands ###


def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("aggregate_user", "supporting_count")
op.drop_column("aggregate_user", "supporter_count")
# ### end Alembic commands ###
4 changes: 4 additions & 0 deletions discovery-provider/src/api/v1/models/users.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
"is_deactivated": fields.Boolean(required=True),
"erc_wallet": fields.String(requred=True),
"spl_wallet": fields.String(required=True),
"supporter_count": fields.Integer(required=True),
"supporting_count": fields.Integer(required=True),
},
)

Expand Down Expand Up @@ -73,6 +75,8 @@
"metadata_multihash": fields.String,
"has_collectibles": fields.Boolean(required=True),
"playlist_library": fields.Nested(playlist_library, allow_null=True),
"does_support_current_user": fields.Boolean(required=True),
"does_current_user_support": fields.Boolean(required=True),
},
)

Expand Down
2 changes: 2 additions & 0 deletions discovery-provider/src/models/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,8 @@ class AggregateUser(Base):
following_count = Column(Integer, nullable=False)
repost_count = Column(Integer, nullable=False)
track_save_count = Column(Integer, nullable=False)
supporter_count = Column(Integer, nullable=False, server_default="0")
supporting_count = Column(Integer, nullable=False, server_default="0")

Index("aggregate_user_idx", "user_id", unique=True)

Expand Down
54 changes: 50 additions & 4 deletions discovery-provider/src/queries/query_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
AggregatePlays,
AggregateTrack,
AggregateUser,
AggregateUserTips,
Follow,
Playlist,
Remix,
Expand All @@ -21,8 +22,8 @@
SaveType,
Track,
User,
UserBankAccount,
)
from src.models.user_bank import UserBankAccount
from src.queries import response_name_constants
from src.queries.get_balances import get_balances
from src.queries.get_unpopulated_users import get_unpopulated_users, set_users_in_cache
Expand Down Expand Up @@ -110,7 +111,7 @@ def parse_sort_param(base_query, model, whitelist_sort_params):


# given list of user ids and corresponding users, populates each user object with:
# track_count, playlist_count, album_count, follower_count, followee_count, repost_count
# track_count, playlist_count, album_count, follower_count, followee_count, repost_count, supporter_count, supporting_count
# if current_user_id available, populates does_current_user_follow, followee_follows
def populate_user_metadata(
session, user_ids, users, current_user_id, with_track_save_count=False
Expand All @@ -125,6 +126,8 @@ def populate_user_metadata(
AggregateUser.following_count,
AggregateUser.repost_count,
AggregateUser.track_save_count,
AggregateUser.supporter_count,
AggregateUser.supporting_count,
)
.filter(AggregateUser.user_id.in_(user_ids))
.all()
Expand All @@ -136,7 +139,7 @@ def populate_user_metadata(
).filter(UserBankAccount.ethereum_address.in_(user["wallet"] for user in users))
user_banks_dict = dict(user_banks)

# build dict of user id --> track/playlist/album/follower/followee/repost/track save counts
# build dict of user id --> track/playlist/album/follower/followee/repost/track save/supporting/supporter counts
count_dict = {
user_id: {
response_name_constants.track_count: track_count,
Expand All @@ -146,6 +149,8 @@ def populate_user_metadata(
response_name_constants.followee_count: following_count,
response_name_constants.repost_count: repost_count,
response_name_constants.track_save_count: track_save_count,
response_name_constants.supporter_count: supporter_count,
response_name_constants.supporting_count: supporting_count,
}
for (
user_id,
Expand All @@ -156,6 +161,8 @@ def populate_user_metadata(
following_count,
repost_count,
track_save_count,
supporter_count,
supporting_count,
) in aggregate_user
}

Expand All @@ -173,6 +180,8 @@ def populate_user_metadata(
track_blocknumber_dict = dict(track_blocknumbers)

follows_current_user_set = set()
supports_current_user_set = set()
current_user_supports_set = set()
current_user_followed_user_ids = {}
current_user_followee_follow_count_dict = {}
if current_user_id:
Expand Down Expand Up @@ -227,6 +236,29 @@ def populate_user_metadata(
current_user_followee_follow_counts
)

supports_current_user_rows = (
session.query(
AggregateUserTips.sender_user_id, AggregateUserTips.receiver_user_id
)
.filter(
AggregateUserTips.receiver_user_id == current_user_id,
AggregateUserTips.sender_user_id.in_(user_ids),
)
.all()
)
rickyrombo marked this conversation as resolved.
Show resolved Hide resolved
current_user_supports_rows = (
session.query(
AggregateUserTips.receiver_user_id, AggregateUserTips.sender_user_id
)
.filter(
AggregateUserTips.sender_user_id == current_user_id,
AggregateUserTips.receiver_user_id.in_(user_ids),
)
.all()
)
supports_current_user_set = set([row[0] for row in supports_current_user_rows])
current_user_supports_set = set([row[0] for row in current_user_supports_rows])

balance_dict = get_balances(session, redis, user_ids)

for user in users:
Expand Down Expand Up @@ -257,6 +289,12 @@ def populate_user_metadata(
user[response_name_constants.track_save_count] = count_dict.get(
user_id, {}
).get(response_name_constants.track_save_count, 0)
user[response_name_constants.supporter_count] = count_dict.get(user_id, {}).get(
response_name_constants.supporter_count, 0
)
user[response_name_constants.supporting_count] = count_dict.get(
user_id, {}
).get(response_name_constants.supporting_count, 0)
# current user specific
user[
response_name_constants.does_current_user_follow
Expand All @@ -282,7 +320,15 @@ def populate_user_metadata(
user[response_name_constants.spl_wallet] = user_banks_dict.get(
user["wallet"], None
)
user["does_follow_current_user"] = user_id in follows_current_user_set
user[response_name_constants.does_follow_current_user] = (
user_id in follows_current_user_set
)
user[response_name_constants.does_support_current_user] = (
user_id in supports_current_user_set
)
user[response_name_constants.does_current_user_support] = (
user_id in current_user_supports_set
)

return users

Expand Down
13 changes: 8 additions & 5 deletions discovery-provider/src/queries/response_name_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@
# boolean - does the remix track author favorite the track
has_remix_author_saved = "has_remix_author_saved"

# boolean - does current user follow given user
does_current_user_follow = "does_current_user_follow"
# integer - number of followees of current user that also follow given user
current_user_followee_follow_count = "current_user_followee_follow_count"

# user metadata
user_id = "user_id" # integer - unique id of a user
follower_count = "follower_count" # integer - total follower count of given user
Expand All @@ -48,12 +43,20 @@
waudio_balance = "waudio_balance"
erc_wallet = "erc_wallet"
spl_wallet = "spl_wallet"
supporter_count = "supporter_count"
supporting_count = "supporting_count"

# current user specific
# boolean - does current user follow given user
does_current_user_follow = "does_current_user_follow"
# boolean - does given user follow current user
does_follow_current_user = "does_follow_current_user"
# integer - number of followees of current user that also follow given user
current_user_followee_follow_count = "current_user_followee_follow_count"
# boolean - has current user tipped given user
does_current_user_support = "does_current_user_support"
# boolean - has given user tipped current user
does_support_current_user = "does_support_current_user"

# feed
# string - timestamp of relevant activity on underlying object, used for sorting
Expand Down
80 changes: 69 additions & 11 deletions discovery-provider/src/tasks/index_aggregate_tips.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,25 +28,83 @@


UPDATE_AGGREGATE_USER_TIPS_QUERY = """
-- Update aggregate_tips table:
INSERT INTO aggregate_user_tips (
sender_user_id
, receiver_user_id
, amount
)
SELECT
sender_user_id
, receiver_user_id
, SUM(amount) as amount
SELECT
sender_user_id
, receiver_user_id
, SUM(amount) as amount
FROM user_tips
WHERE
slot > :prev_slot AND
slot <= :current_slot
GROUP BY
sender_user_id
, receiver_user_id
ON CONFLICT (sender_user_id, receiver_user_id)
DO UPDATE
SET amount = EXCLUDED.amount + aggregate_user_tips.amount;

-- Update aggregate_user supporter/supporting counts:
WITH recent_tips AS (
SELECT
sender_user_id
, receiver_user_id
FROM user_tips
WHERE
slot > :prev_slot AND
slot <= :current_slot
slot > :prev_slot AND
slot <= :current_slot
GROUP BY
sender_user_id
, receiver_user_id
ON CONFLICT (sender_user_id, receiver_user_id)
DO UPDATE
SET amount = EXCLUDED.amount + aggregate_user_tips.amount
sender_user_id
, receiver_user_id
)
, user_ids AS (
SELECT sender_user_id AS user_id FROM recent_tips
UNION SELECT receiver_user_id AS user_id FROM recent_tips
), supporting AS (
SELECT receiver_user_id AS user_id, COUNT(sender_user_id) AS total_supporting
FROM aggregate_user_tips
WHERE receiver_user_id IN (SELECT user_id FROM user_ids)
GROUP BY receiver_user_id
), supporters AS (
SELECT sender_user_id AS user_id, COUNT(receiver_user_id) AS total_supporters
FROM aggregate_user_tips
WHERE sender_user_id IN (SELECT user_id FROM user_ids)
GROUP BY sender_user_id
)
INSERT INTO aggregate_user (
user_id,
track_count,
playlist_count,
album_count,
follower_count,
following_count,
repost_count,
track_save_count,
supporter_count,
supporting_count
)
SELECT
supporting.user_id AS user_id,
0 AS track_count,
0 AS playlist_count,
0 AS album_count, 0 AS follower_count,
0 AS following_count,
0 AS repost_count,
0 AS track_save_count,
total_supporting AS supporting_count,
total_supporters AS supporter_count
FROM supporting
FULL OUTER JOIN supporters ON supporting.user_id = supporters.user_id
ON CONFLICT (user_id)
DO
UPDATE SET
supporting_count = EXCLUDED.supporting_count,
supporter_count = EXCLUDED.supporter_count
"""


Expand Down