Skip to content

Commit

Permalink
In worker, sync fitbit data for users (#107)
Browse files Browse the repository at this point in the history
- Create worker directory, and move requirements & mypy.ini out into
python dir
- Move config.py to src/python
- Move models out to src/python
- Move wait-for-postgres out into src/python
- Remove unused import
- Fixup compose; implement worker
- Run worker correctly from context
- Correctly refresh token; also mark older requests as done
- Also update user's synced_at timestamp

Fixes #12.
  • Loading branch information
shaldengeki authored May 29, 2023
1 parent 1582bd7 commit 85119a3
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 20 deletions.
4 changes: 2 additions & 2 deletions fitbit-challenges/bin/reload-prod
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ set -ex
git reset --hard
git switch main
git pull origin main
docker compose -f docker-compose.yaml -f docker-compose.prod.yaml build api frontend
docker compose -f docker-compose.yaml -f docker-compose.prod.yaml up --no-deps -d api frontend migration
docker compose -f docker-compose.yaml -f docker-compose.prod.yaml build api frontend worker
docker compose -f docker-compose.yaml -f docker-compose.prod.yaml up --no-deps -d api frontend migration worker
6 changes: 0 additions & 6 deletions fitbit-challenges/docker-compose.override.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,6 @@ services:
migration:
<<: *api
ports: []
environment:
- FLASK_DEBUG=True
- DB_HOST=pg
- DB_USERNAME=admin
- DB_PASSWORD=development
- DATABASE_NAME=api_development
worker:
<<: *api
ports: []
Expand Down
103 changes: 91 additions & 12 deletions fitbit-challenges/src/python/worker/app.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from base64 import b64encode
import datetime
from datetime import timezone
import requests
from sqlalchemy import desc, update
from sqlalchemy.sql.functions import now
import time
from typing import Optional
from urllib.parse import urlencode

from ..config import app, db
from ..models import SubscriptionNotification, User, UserActivity
Expand All @@ -12,7 +16,7 @@ def maybe_fetch_subscription_notification() -> Optional[SubscriptionNotification
# Lock a job, if it exists.
notification = (
SubscriptionNotification.query.filter(
SubscriptionNotification.processed_at is None
SubscriptionNotification.processed_at == None
)
.order_by(desc(SubscriptionNotification.created_at))
.first()
Expand All @@ -22,7 +26,7 @@ def maybe_fetch_subscription_notification() -> Optional[SubscriptionNotification
return None

print(f"Subscription notification to process: {notification.id}")
notification.processed_at = now
notification.processed_at = datetime.datetime.now().astimezone(timezone.utc)
db.session.add(notification)
db.session.commit()
print(f"Notification locked.")
Expand All @@ -36,32 +40,92 @@ def fetch_user_for_notification(
return User.query.filter(User.fitbit_user_id == notification.fitbit_user_id).first()


def request_indicates_expired_token(response: dict) -> bool:
return "errors" in response and any(
e["errorType"] == "expired_token" for e in response["errors"]
)


def refresh_tokens_for_user(user: User, client_id: str, client_secret: str) -> User:
encoded_client_and_secret = b64encode(
f"{client_id}:{client_secret}".encode("utf-8")
).decode("utf-8")

url_parameters = urlencode(
{
"grant_type": "refresh_token",
"refresh_token": user.fitbit_refresh_token,
}
)

response = requests.post(
"https://api.fitbit.com/oauth2/token",
headers={
"Content-Type": "application/x-www-form-urlencoded",
"Authorization": f"Basic {encoded_client_and_secret}",
},
data=url_parameters,
)

if response.status_code not in (200, 201):
raise ValueError(f"Error when refreshing user tokens: {response.json()}")

data = response.json()
user.fitbit_access_token = data["access_token"]
user.fitbit_refresh_token = data["refresh_token"]
db.session.add(user)
db.session.commit()

return user


def fetch_user_activity_for_notification(
notification: SubscriptionNotification,
user: User,
client_id: str,
client_secret: str,
) -> dict:
formatted_date: str = notification.date.strftime("%Y-%m-%d")
user = fetch_user_for_notification(notification)
if user is None:
return []

print(f"Fetching {notification.fitbit_user_id}'s activity for {formatted_date}.")
data = requests.get(
f"https://api.fitbit.com/1/user/{notification.fitbit_user_id}/activities/date/{formatted_date}.json",
headers={"Authorization": f"Bearer {user.fitbit_access_token}"},
).json()
print(f"Fetched data: {data}")

if request_indicates_expired_token(data):
print(f"Refreshing expired token.")
user = refresh_tokens_for_user(user, client_id, client_secret)
print(
f"Fetching {notification.fitbit_user_id}'s activity for {formatted_date}."
)
data = requests.get(
f"https://api.fitbit.com/1/user/{notification.fitbit_user_id}/activities/date/{formatted_date}.json",
headers={"Authorization": f"Bearer {user.fitbit_access_token}"},
).json()

return data


def process_subscription_notifications() -> None:
def process_subscription_notifications(client_id: str, client_secret: str) -> None:
notification = maybe_fetch_subscription_notification()
if notification is None:
print("No subscription notifications to process, skipping.")
return

try:
# Fetch the user's activity for this date.
activity = fetch_user_activity_for_notification(notification)
user = fetch_user_for_notification(notification)
if user is None:
print(
f"No user found for notification {notification}, marking as done and skipping."
)
notification.processed_at = datetime.datetime.now().astimezone(timezone.utc)
db.session.add(notification)
db.session.commit()
return None
activity = fetch_user_activity_for_notification(
notification, user, client_id, client_secret
)
active_minutes: int = (
activity["summary"]["veryActiveMinutes"]
+ activity["summary"]["fairlyActiveMinutes"]
Expand All @@ -82,26 +146,41 @@ def process_subscription_notifications() -> None:
active_minutes=active_minutes,
distance_km=distance_km,
)
user.synced_at = datetime.datetime.now().astimezone(timezone.utc)

print(f"Recording new activity: {new_activity}")
print(f"Recording new activity.")
db.session.add(new_activity)
db.session.add(user)
db.session.commit()
except:
notification.processed_at = None
db.session.add(notification)
db.session.commit()
raise

# Mark all older notifications than this one as done, too.
update_older_notifications = (
update(SubscriptionNotification)
.where(SubscriptionNotification.fitbit_user_id == notification.fitbit_user_id)
.where(SubscriptionNotification.date == notification.date)
.where(SubscriptionNotification.created_at <= notification.created_at)
.values(processed_at=datetime.datetime.now().astimezone(timezone.utc))
)
db.session.execute(update_older_notifications)


max_delay = 10


def main() -> int:
with app.app_context():
client_id = app.config["FITBIT_CLIENT_ID"]
client_secret = app.config["FITBIT_CLIENT_SECRET"]

while True:
start = time.time()
process_subscription_notifications()
delay = (time.time() + max_delay) - start
process_subscription_notifications(client_id, client_secret)
delay = (start + max_delay) - time.time()
if delay > 0:
print(f"Sleeping for {round(delay)} seconds")
time.sleep(delay)
Expand Down

0 comments on commit 85119a3

Please sign in to comment.