Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert raw sql to sqlalchemy for standings #8

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
239 changes: 122 additions & 117 deletions espn_ffb/db/query.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import namedtuple
from espn_ffb.db.model.champions import Champions
from espn_ffb.db.model.matchups import Matchups
from espn_ffb.db.model.owners import Owners
Expand All @@ -7,7 +6,37 @@
from espn_ffb.db.model.teams import Teams
from sqlalchemy import case, desc, func
from sqlalchemy.dialects.postgresql import insert as pg_insert
from typing import Sequence
from typing import Dict, List, NamedTuple, Optional, Set, Sequence


class H2HRecord(NamedTuple):
opponent_name: str
wins: int
losses: int


class StandingsRecord(NamedTuple):
owner_id: int
wins: int
losses: int
ties: int
win_percentage: float
points_for: float
points_against: float
avg_points_for: float
avg_points_against: float
championships: float
sackos: float


class WinLossRecord(NamedTuple):
wins: int
losses: int


class WinStreakRecord(NamedTuple):
streak: int
streak_owner: Optional[int]


class Query:
Expand All @@ -27,21 +56,22 @@ def get_champions(self):

return champions

def get_distinct_matchup_team_ids(self, year: int):
def get_distinct_matchup_team_ids(self, year: int) -> Dict[str, Set[int]]:
distinct_matchup_team_ids = dict()
matchups = self.get_matchups(year)
for matchup in matchups:
if matchup.matchup_id not in distinct_matchup_team_ids:
distinct_matchup_team_ids[matchup.matchup_id] = {matchup.team_id}
else:
if (matchup.opponent_team_id is None) or (
matchup.opponent_team_id not in distinct_matchup_team_ids.get(matchup.matchup_id)):
matchup.opponent_team_id not in distinct_matchup_team_ids[matchup.matchup_id]):
distinct_matchup_team_ids[matchup.matchup_id].add(matchup.team_id)

return distinct_matchup_team_ids

def get_h2h_record_current(self, matchups, year, week):
Record = namedtuple('Record', ['wins', 'losses'])
def get_h2h_record_current(
self, matchups: Sequence[Matchups], year: int, week: int
) -> List[WinLossRecord]:
h2h_record = list()
for m in matchups:
wins, losses = 0, 0
Expand All @@ -54,14 +84,13 @@ def get_h2h_record_current(self, matchups, year, week):
else:
losses += 1

h2h_record.append(Record(wins, losses))
h2h_record.append(WinLossRecord(wins, losses))

return h2h_record

def get_h2h_records(self, owner_id, is_playoffs):
COLUMN_NAMES = ['opponent_name', 'wins', 'losses']
Record = namedtuple('H2HRecord', COLUMN_NAMES)

def get_h2h_records(
self, owner_id: int, is_playoffs: bool
) -> List[H2HRecord]:
full_name_concat = func.CONCAT(Owners.first_name, ' ', Owners.last_name)

owner_name = (
Expand Down Expand Up @@ -117,7 +146,7 @@ def get_h2h_records(self, owner_id, is_playoffs):
.order_by(record_subquery.c.opponent_name)
)

return [Record(*r) for r in h2h_records_query]
return [H2HRecord(*r) for r in h2h_records_query]

def get_matchup_history(self, owner_id, opponent_owner_id, is_playoffs):
matchups = self.db.session.query(Matchups) \
Expand Down Expand Up @@ -146,131 +175,107 @@ def get_matchups(self, year: int) -> Sequence[Matchups]:
def get_owners(self):
return self.db.session.query(Owners).all()

def get_records(self, year: int) -> Sequence[Records]:
def get_records(self, year: Optional[int]) -> Sequence[Records]:
"""
Select records for a given year.
Select records for a given year or all years if year is None.

:param year: the year
:return: list of records
"""
records = self.db.session.query(Records).filter_by(year=year).all()
return records
records_query = self.db.session.query(Records)
if year:
records_query = records_query.filter_by(year=year)
return records_query.all()

def get_sacko_current(self):
sacko = self.db.session.query(Sackos) \
.order_by(desc(Sackos.year)) \
.first()
return sacko

def get_standings(self, year: int):
COLUMN_NAMES = ["owner_id", "wins", "losses", "win_percentage", "points_for", "points_against", "avg_points_for",
"avg_points_against", "championships", "sackos"]
Record = namedtuple('Standings', COLUMN_NAMES)

query = f"""
select
r.owner_id as owner_id,
r.wins as wins,
r.losses as losses,
case
when (r.wins + r.losses) = 0
then 0
else
round(r.wins::decimal/(r.wins + r.losses) , 4)
end as win_percentage,
r.points_for as points_for,
r.points_against as points_against,
case
when (r.wins + r.losses) = 0
then 0
else
round(r.points_for/(r.wins + r.losses), 2)
end as avg_points_for,
case
when (r.wins + r.losses) = 0
then 0
else
round(r.points_against/(r.wins + r.losses), 2)
end as avg_points_against,
(select count(1) from champions where owner_id = r.owner_id and year = r.year) as championships,
(select count(1) from sackos where owner_id = r.owner_id and year = r.year) as sackos
from
records r
where
r.year = {year}
group by
r.year,
r.owner_id,
r.wins,
r.losses,
r.points_for,
r.points_against
order by
win_percentage desc,
points_for desc
"""
def get_standings(
self, year: Optional[int] = None
) -> List[StandingsRecord]:
owners = self.get_owners()
records = self.get_records(year=year)

standings = []
for owner in owners:
owners_records = [r for r in records if r.owner_id == owner.id]
# Skip owner without records. Common when viewing standings for a
# year where an owner did not participate.
if not owners_records:
continue

wins = sum(r.wins for r in owners_records)
losses = sum(r.losses for r in owners_records)
ties = sum(r.ties for r in owners_records)
total_games = wins + losses + ties

points_for = sum(r.points_for for r in owners_records)
points_against = sum(r.points_against for r in owners_records)

avg_points_for = float(0)
avg_points_against = float(0)
win_percentage = float(0)
if total_games > 0:
win_percentage = float(f"{wins / total_games:.4f}")
avg_points_for = float(f"{points_for / total_games:.2f}")
avg_points_against = float(f"{points_against / total_games:.2f}")

sackos_query = self.db.session.query(Sackos).filter_by(
owner_id=owner.id
)
champions_query = self.db.session.query(Champions).filter_by(
owner_id=owner.id
)
if year:
sackos_query = sackos_query.filter_by(year=year)
champions_query = champions_query.filter_by(year=year)

standings.append(
StandingsRecord(
owner.id,
wins,
losses,
ties,
win_percentage,
points_for,
points_against,
avg_points_for,
avg_points_against,
champions_query.count(),
sackos_query.count(),
)
)

return [Record(**r) for r in self.db.engine.execute(query)]

def get_standings_overall(self):
COLUMN_NAMES = ["owner_id", "wins", "losses", "win_percentage", "points_for", "points_against", "avg_points_for",
"avg_points_against", "championships", "sackos"]
Record = namedtuple('Standings', COLUMN_NAMES)

query = f"""
select
r.owner_id as owner_id,
sum(r.wins) as wins,
sum(r.losses) as losses,
case
when (sum(r.wins) + sum(r.losses)) = 0
then 0
else
round(sum(r.wins)::decimal/(sum(r.wins) + sum(r.losses)) , 4)
end as win_percentage,
sum(r.points_for) as points_for,
sum(r.points_against) as points_against,
case
when (sum(r.wins) + sum(r.losses)) = 0
then 0
else
round(sum(r.points_for)/(sum(r.wins) + sum(r.losses)), 2)
end as avg_points_for,
case
when (sum(r.wins) + sum(r.losses)) = 0
then 0
else
round(sum(r.points_against)/(sum(r.wins) + sum(r.losses)), 2)
end as avg_points_against,
(select count(1) from champions where owner_id = r.owner_id) as championships,
(select count(1) from sackos where owner_id = r.owner_id) as sackos
from
records r
group by
r.owner_id
order by
win_percentage desc,
avg_points_for desc
"""
standings.sort(
key=lambda x: (x.win_percentage, x.points_for), reverse=True
)

return [Record(**r) for r in self.db.engine.execute(query)]
return standings

def get_team_id_to_record(self, year, week):
team_id_to_record = dict()
Record = namedtuple('Record', ['wins', 'losses'])
def get_team_id_to_record(
self, year: int, week: int
) -> Dict[str, WinLossRecord]:
team_id_to_record: Dict[str, WinLossRecord] = dict()
matchups = self.get_matchups(year)
for m in matchups:
if m.team_id in team_id_to_record:
r = team_id_to_record.get(m.team_id)
r = team_id_to_record[m.team_id]
else:
r = Record(0, 0)
r = WinLossRecord(0, 0)
team_id_to_record[m.team_id] = r

if m.matchup_id < week:
if m.team_score > m.opponent_team_score:
team_id_to_record[m.team_id] = Record(r.wins + 1, r.losses)
team_id_to_record[m.team_id] = WinLossRecord(
r.wins + 1, r.losses
)
if m.team_score < m.opponent_team_score:
team_id_to_record[m.team_id] = Record(r.wins, r.losses + 1)
team_id_to_record[m.team_id] = WinLossRecord(
r.wins, r.losses + 1
)

return team_id_to_record

Expand All @@ -288,10 +293,10 @@ def get_teams(self, year: int):
teams = self.db.session.query(Teams).filter_by(year=year).all()
return teams

def get_win_streak_by_year(self, matchups, year, week):
def get_win_streak_by_year(
self, matchups: Sequence[Matchups], year: int, week: int
) -> List[WinStreakRecord]:
win_streaks = list()
WinStreak = namedtuple('WinStreak', ['streak', 'streak_owner'])

for m in matchups:
streak, streak_owner, streak_type = 0, None, None
h2h_history = [h2h for h2h in self.get_matchup_history(m.owner_id, m.opponent_owner_id, False) if
Expand All @@ -307,7 +312,7 @@ def get_win_streak_by_year(self, matchups, year, week):
break

streak += 1
win_streaks.append(WinStreak(streak, streak_owner))
win_streaks.append(WinStreakRecord(streak, streak_owner))

return win_streaks

Expand Down
6 changes: 3 additions & 3 deletions espn_ffb/views/standings.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from flask import Blueprint, current_app, render_template

standings = Blueprint("standings", __name__, template_folder="templates")
TABLE_HEADERS = ["Name", "Wins", "Losses", "Win Percentage", "Points For", "Points Against", "Average Points For",
TABLE_HEADERS = ["Name", "Wins", "Losses", "Ties", "Win Percentage", "Points For", "Points Against", "Average Points For",
"Average Points Against", "Championships", "Sackos"]
COLUMN_NAMES = ["owner_id", "wins", "losses", "win_percentage", "points_for", "points_against", "avg_points_for",
COLUMN_NAMES = ["owner_id", "wins", "losses", "ties", "win_percentage", "points_for", "points_against", "avg_points_for",
"avg_points_against", "championships", "sackos"]


Expand All @@ -20,7 +20,7 @@ def show(year: str):
sacko = query.get_sacko_current()

if year == "overall":
records = query.get_standings_overall()
records = query.get_standings()
else:
records = query.get_standings(year=int(year))

Expand Down