Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add backfill functionality #1

Merged
merged 5 commits into from
Jul 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
.github/
.git/
serverless/
compose/
configs/
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ COPY cmd/ ./cmd/
COPY internal/ ./internal/
# TODO(zeke): if we introduce pkg dir, need to copy it here

# -tags timetzdata is for embedding tz info for LoadLocation call in binary
RUN CGO_ENABLED=0 GOOS=linux GOEXPERIMENT=rangefunc go build \
-tags=jsoniter -v -o /zest-api ./cmd/
-tags=jsoniter -tags timetzdata -v -o /zest-api ./cmd/

FROM scratch

Expand Down
15 changes: 13 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
##################
## docker commands
##################
DOCKER=sudo docker
#DOCKER=sudo docker
DOCKER=docker
ifeq ($(shell uname -s),Linux)
DOCKER := sudo docker
endif
COMPOSE=$(DOCKER) compose

.PHONY: build up up-debug up-monitoring clean down down-with-volumes
Expand Down Expand Up @@ -35,7 +39,7 @@ GFLAGS=-tags=jsoniter
GVARS=GOEXPERIMENT=rangefunc
GORUN=$(GVARS) go run $(GFLAGS)

.PHONY: fmt run help test scrape dump
.PHONY: fmt run help test scrape dump backfill

fmt:
go mod tidy
Expand All @@ -57,6 +61,13 @@ scrape:
dump:
$(GORUN) ./cmd dump

backfill:
CREDS=--username=$ZEST_USERNAME --password=$ZEST_PASSWORD
#$(GORUN) ./cmd backfill --help
#$(GORUN) ./cmd backfill --resource=reddit $(CREDS)
$(GORUN) ./cmd backfill --resource=spotify $(CREDS) \
--start=2024-04-04 --end=2024-05-28

##################
## deploy commands
##################
Expand Down
96 changes: 96 additions & 0 deletions cmd/backfill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
package main

import (
"bytes"
"context"
"fmt"
"log/slog"
"net/http"
"net/http/cookiejar"
"slices"
"time"

jsoniter "github.com/json-iterator/go"
)

type RangeParams struct {
Start string `short:"s" help:"start of backfill if necessesary"`
End string `short:"e" help:"end of backfill if necessary"`
}

type BackfillCmd struct {
Username string `short:"u" env:"ZEST_USERNAME" help:"username to sign into backend"`
Password string `short:"p" env:"ZEST_PASSWORD" help:"password to sign into backend"`
Resource string `short:"r" help:"resource to hit"`
RangeParams
}

func (r *BackfillCmd) Run() error {
ctx := context.Background()
logger := slog.Default()

if !slices.Contains([]string{"reddit", "metacritic", "spotify"}, r.Resource) {
return fmt.Errorf("invalid event type: %v", r.Resource)
}

// login first!
jar, err := cookiejar.New(nil)
if err != nil {
return fmt.Errorf("error making cookie jar: %v", err)
}

var bs bytes.Buffer
if err := jsoniter.NewEncoder(&bs).Encode(struct {
Username string `json:"username"`
Password string `json:"password"`
}{
Username: r.Username,
Password: r.Password,
}); err != nil {
return fmt.Errorf("error encoding credentials: %v", err)
}

logger.Info("logging into server")
uri := "https://api.zekereyna.dev/login"
req, err := http.NewRequestWithContext(ctx, http.MethodPost, uri, &bs)
if err != nil {
return fmt.Errorf("error making login request: %v", err)
}

client := http.Client{
Timeout: 60 * time.Second,
Jar: jar,
}

resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("error doing login request: %v", err)
}
resp.Body.Close() // nothing to handle but still should close
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("error code from login request, status code: %v", resp.StatusCode)
}

uri = "https://api.zekereyna.dev/v1/" + r.Resource + "/backfill"
if r.Resource == "spotify" {
uri += fmt.Sprintf("?start=%v&end=%v", r.Start, r.End)
}
logger.Info("refreshing server, uri: " + uri)
req, err = http.NewRequestWithContext(ctx, http.MethodPost, uri, nil)
if err != nil {
return fmt.Errorf("error making request: %v", err)
}

resp, err = client.Do(req)
if err != nil {
return fmt.Errorf("error doing request, err: %v", err)
}
resp.Body.Close() // nothing to handle but still should close
if resp.StatusCode >= 400 {
return fmt.Errorf("error doing request, status code: %v", resp.StatusCode)
}

logger.Info("successfully refreshed server")

return nil
}
10 changes: 6 additions & 4 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ package main
import (
"context"
"errors"
sloggin "github.com/samber/slog-gin"
"log/slog"
"net/http"
"os"
"os/signal"
"strconv"
"time"

sloggin "github.com/samber/slog-gin"

"github.com/gin-gonic/gin"
"github.com/prometheus/client_golang/prometheus/promhttp"
cors "github.com/rs/cors/wrapper/gin"
Expand All @@ -32,9 +33,10 @@ func main() {
}

var cli struct {
Server ServerCmd `cmd:"" help:"run server"`
Scrape ScrapeCmd `cmd:"" help:"scrape the internet"`
Dump DumpCmd `cmd:"" help:"dump from sqlite to postgres"`
Server ServerCmd `cmd:"" help:"run server"`
Scrape ScrapeCmd `cmd:"" help:"scrape the internet"`
Dump DumpCmd `cmd:"" help:"dump from sqlite to postgres"`
Backfill BackfillCmd `cmd:"" help:"hit the server"`
}

type ServerCmd struct {
Expand Down
38 changes: 38 additions & 0 deletions internal/reddit/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"database/sql"
"log/slog"
"net/http"
"time"

"github.com/gin-gonic/gin"

Expand Down Expand Up @@ -37,6 +38,7 @@ func (svc Controller) Register(r gin.IRouter, auth gin.HandlerFunc) {
g.GET("/posts", zgin.WithUser(svc.getPosts))
g.GET("/subreddits", zgin.WithUser(svc.getSubreddits))
g.POST("/refresh", zgin.WithUser(svc.refresh))
g.POST("/backfill", zgin.WithUser(svc.backfill))
}

func (svc Controller) getPosts(c *gin.Context, userID int, logger *slog.Logger) {
Expand Down Expand Up @@ -95,3 +97,39 @@ func (svc Controller) refresh(c *gin.Context, userID int, logger *slog.Logger) {

c.IndentedJSON(http.StatusOK, gin.H{"num_refreshed": len(ids)})
}

func (svc Controller) backfill(c *gin.Context, userID int, logger *slog.Logger) {
// TODO(zeke): to have this, the client needs to be updated.
// generally need to pass a start/stop.
// I _think_ the way it works, is
// saved posts are turned with a SORT BY created LIMIT 50
// of sorts. And then providing an `after` param will give you the next with a
// WHERE created_at < after.datetime
// or something.
//
// for now, just do a refresh with all! will likely take a while, but hopefully isn't so bad.
go func() {
// TODO(zeke): create a new context?
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancel()
savedPosts, err := svc.Client.Fetch(ctx, true)
if err != nil {
logger.Error("error fetching posts", "error", err)
return
}

logger.Info("successfully fetched posts", slog.Int("num_posts", len(savedPosts)))

ids, err := svc.Store.PersistPosts(ctx, savedPosts, userID)
if err != nil {
logger.Error("error persisting posts", "error", err)
return
}

logger.Info("successfully persisted posts", slog.Int("num_persisted", len(ids)))
}()

c.IndentedJSON(http.StatusAccepted, gin.H{
"message": "backfill successfully started",
})
}
111 changes: 96 additions & 15 deletions internal/spotify/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package spotify
import (
"context"
"database/sql"
"github.com/zestze/zest-backend/internal/publisher"
"fmt"
"log/slog"
"net/http"
"time"

"github.com/zestze/zest-backend/internal/publisher"

"github.com/gin-gonic/gin"
"github.com/zestze/zest-backend/internal/zgin"
)
Expand Down Expand Up @@ -40,35 +42,39 @@ func (svc Controller) Register(r gin.IRouter, auth gin.HandlerFunc) {
g := r.Group("/spotify")
g.Use(auth)
g.POST("/refresh", zgin.WithUser(svc.refresh))
g.POST("/backfill", zgin.WithUser(svc.backfill))
g.POST("/token", zgin.WithUser(svc.addToken))
g.GET("/songs", zgin.WithUser(svc.getSongs))
g.GET("/artists", zgin.WithUser(svc.getArtists))
g.GET("/artist/songs", zgin.WithUser(svc.getSongsForArtist))
}

func (svc Controller) refresh(c *gin.Context, userID int, logger *slog.Logger) {
token, err := svc.StoreV2.GetToken(c, userID)
func (svc Controller) fetchToken(ctx context.Context, userID int) (AccessToken, error) {
token, err := svc.StoreV2.GetToken(ctx, userID)
if err != nil {
logger.Error("error fetching token", "error", err)
zgin.InternalError(c)
return
return AccessToken{}, fmt.Errorf("error getting token %w", err)
}

logger.Info("successfully fetched token")
if token.Expired() {
token, err = svc.Client.RefreshAccess(c, token)
token, err = svc.Client.RefreshAccess(ctx, token)
if err != nil {
logger.Error("error refreshing token", "error", err)
zgin.InternalError(c)
return
return AccessToken{}, fmt.Errorf("error refreshing token %w", err)
}

if err = svc.StoreV2.PersistToken(c, token, userID); err != nil {
logger.Error("error persisting token", "error", err)
zgin.InternalError(c)
return
if err = svc.StoreV2.PersistToken(ctx, token, userID); err != nil {
return AccessToken{}, fmt.Errorf("error persisting token %w", err)
}
}
return token, nil
}

func (svc Controller) refresh(c *gin.Context, userID int, logger *slog.Logger) {
token, err := svc.fetchToken(c, userID)
if err != nil {
logger.Error("error fetching token", "error", err)
zgin.InternalError(c)
return
}

after := time.Now().Add(-time.Hour).UTC()
items, err := svc.Client.GetRecentlyPlayed(c, token, after)
Expand Down Expand Up @@ -111,6 +117,81 @@ func (svc Controller) refresh(c *gin.Context, userID int, logger *slog.Logger) {
c.IndentedJSON(http.StatusOK, msg)
}

// TODO(zeke): generally backfill doesn't feel like a great reason to have a separate endpoint
func (svc Controller) backfill(c *gin.Context, userID int, logger *slog.Logger) {
qStart, qEnd := c.Query("start"), c.Query("end")
if qStart == "" || qEnd == "" {
zgin.BadRequest(c, "please provide start and end for backfill")
return
}

// TODO(zeke): maybe put timezone into env var?
// parse as datetime!
loc, err := time.LoadLocation("America/New_York")
if err != nil {
logger.Error("somehow didn't load location", "error", err)
zgin.InternalError(c)
return
}
start, err := time.ParseInLocation(time.DateOnly, qStart, loc)
if err != nil {
zgin.BadRequest(c, "start must be provided as format "+time.DateOnly)
return
}
end, err := time.ParseInLocation(time.DateOnly, qEnd, loc)
if err != nil {
zgin.BadRequest(c, "end must be provided as format "+time.DateOnly)
return
}

// then fetch token
token, err := svc.fetchToken(c, userID)
if err != nil {
logger.Error("error fetching token", "error", err)
zgin.InternalError(c)
return
}

// finally, put rest of job as an async goroutine
go func() {
// TODO(zeke): need to grab context keys such as request id!
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Minute)
defer cancel()
for {
// TODO(zeke): might need to try a strategy where we request backwards.
// it looks like right now it's just giving me literally what has been recently played.
// when i want more history!
if start.After(end) {
logger.Info("ending loop due to hitting end")
return
}

items, err := svc.Client.GetRecentlyPlayed(ctx, token, start)
if err != nil {
logger.Error("error fetching songs", "error", err, "after", start)
return
}
if len(items) == 0 {
logger.Info("ending backfill due to lack of songs", "after", start)
return
}

persisted, err := svc.StoreV2.PersistRecentlyPlayed(ctx, items, userID)
if err != nil {
logger.Error("error persisting songs", "error", err)
return
}
logger.Info("successfully persisted songs", "num_persisted", len(persisted))

start = items[len(items)-1].PlayedAt
}
}()

c.IndentedJSON(http.StatusAccepted, gin.H{
"message": "backfill successfully started",
})
}

func (svc Controller) addToken(c *gin.Context, userID int, logger *slog.Logger) {
var token AccessToken
if err := c.ShouldBindJSON(&token); err != nil {
Expand Down
Loading