Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/Flaresolverr support #12

Merged
merged 4 commits into from
Sep 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/bludv.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (i *Indexer) HandlerBluDVIndexer(w http.ResponseWriter, r *http.Request) {
}

fmt.Println("URL:>", url)
resp, err := http.Get(url)
resp, err := i.requester.GetDocument(ctx, url)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
Expand All @@ -54,9 +54,9 @@ func (i *Indexer) HandlerBluDVIndexer(w http.ResponseWriter, r *http.Request) {
i.metrics.IndexerErrors.WithLabelValues("bludv").Inc()
return
}
defer resp.Body.Close()
defer resp.Close()

doc, err := goquery.NewDocumentFromReader(resp.Body)
doc, err := goquery.NewDocumentFromReader(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
Expand Down
13 changes: 7 additions & 6 deletions api/comando_torrents.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (i *Indexer) HandlerComandoIndexer(w http.ResponseWriter, r *http.Request)
}

fmt.Println("URL:>", url)
resp, err := http.Get(url)
resp, err := i.requester.GetDocument(ctx, url)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
Expand All @@ -70,9 +70,9 @@ func (i *Indexer) HandlerComandoIndexer(w http.ResponseWriter, r *http.Request)
i.metrics.IndexerErrors.WithLabelValues("comando").Inc()
return
}
defer resp.Body.Close()
defer resp.Close()

doc, err := goquery.NewDocumentFromReader(resp.Body)
doc, err := goquery.NewDocumentFromReader(resp)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()})
Expand Down Expand Up @@ -405,17 +405,18 @@ func getDocument(ctx context.Context, i *Indexer, link string) (*goquery.Documen
docCache, err := i.redis.Get(ctx, link)
if err == nil {
i.metrics.CacheHits.WithLabelValues("document_body").Inc()
fmt.Printf("returning from long-lived cache: %s\n", link)
return goquery.NewDocumentFromReader(io.NopCloser(bytes.NewReader(docCache)))
}
defer i.metrics.CacheMisses.WithLabelValues("document_body").Inc()

resp, err := http.Get(link)
resp, err := i.requester.GetDocument(ctx, link)
if err != nil {
return nil, err
}
defer resp.Body.Close()
defer resp.Close()

body, err := io.ReadAll(resp.Body)
body, err := io.ReadAll(resp)
if err != nil {
return nil, err
}
Expand Down
13 changes: 8 additions & 5 deletions api/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (

"github.com/felipemarinho97/torrent-indexer/cache"
"github.com/felipemarinho97/torrent-indexer/monitoring"
"github.com/felipemarinho97/torrent-indexer/requester"
"github.com/felipemarinho97/torrent-indexer/schema"
)

type Indexer struct {
redis *cache.Redis
metrics *monitoring.Metrics
redis *cache.Redis
metrics *monitoring.Metrics
requester *requester.Requster
}

type IndexerMeta struct {
Expand Down Expand Up @@ -42,10 +44,11 @@ type IndexedTorrent struct {
Similarity float32 `json:"similarity"`
}

func NewIndexers(redis *cache.Redis, metrics *monitoring.Metrics) *Indexer {
func NewIndexers(redis *cache.Redis, metrics *monitoring.Metrics, req *requester.Requster) *Indexer {
return &Indexer{
redis: redis,
metrics: metrics,
redis: redis,
metrics: metrics,
requester: req,
}
}

Expand Down
2 changes: 1 addition & 1 deletion cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

var (
DefaultExpiration = 24 * time.Hour * 180 // 180 days
DefaultExpiration = 24 * time.Hour * 7 // 7 days
IndexerComandoTorrents = "indexer:comando_torrents"
)

Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
- indexer
environment:
- REDIS_HOST=redis
- FLARESOLVERR_ADDRESS=http://flaresolverr:8191

redis:
image: redis:alpine
Expand Down
10 changes: 8 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
package main

import (
"fmt"
"net/http"
"os"

handler "github.com/felipemarinho97/torrent-indexer/api"
"github.com/felipemarinho97/torrent-indexer/cache"
"github.com/felipemarinho97/torrent-indexer/monitoring"
"github.com/felipemarinho97/torrent-indexer/requester"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
redis := cache.NewRedis()
metrics := monitoring.NewMetrics()
metrics.Register()
indexers := handler.NewIndexers(redis, metrics)

flaresolverr := requester.NewFlareSolverr(os.Getenv("FLARESOLVERR_ADDRESS"), 60000)
req := requester.NewRequester(flaresolverr, redis)
indexers := handler.NewIndexers(redis, metrics, req)

indexerMux := http.NewServeMux()
metricsMux := http.NewServeMux()
Expand All @@ -31,7 +37,7 @@ func main() {
panic(err)
}
}()

fmt.Println("Server listening on :7006")
err := http.ListenAndServe(":7006", indexerMux)
if err != nil {
panic(err)
Expand Down
230 changes: 230 additions & 0 deletions requester/flaresolverr.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package requester

import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"sync"
)

type FlareSolverr struct {
url string
maxTimeout int
httpClient *http.Client
sessionPool chan string
mu sync.Mutex
initiated bool
}

func NewFlareSolverr(url string, timeoutMilli int) *FlareSolverr {
poolSize := 5
httpClient := &http.Client{}
sessionPool := make(chan string, poolSize) // Pool size of 5 sessions

f := &FlareSolverr{
url: url,
maxTimeout: timeoutMilli,
httpClient: httpClient,
sessionPool: sessionPool,
}

err := f.FillSessionPool()
if err == nil {
f.initiated = true
}

return f
}

func (f *FlareSolverr) FillSessionPool() error {
// Check if the pool is already filled
if len(f.sessionPool) == cap(f.sessionPool) {
return nil
}

// Pre-initialize the pool with existing sessions
sessions, err := f.ListSessions()
if err != nil {
fmt.Println("Failed to list existing FlareSolverr sessions:", err)
return err
} else {
for _, session := range sessions {
// Add available sessions to the pool
if len(f.sessionPool) < cap(f.sessionPool) {
f.sessionPool <- session
}
}
if len(f.sessionPool) > 0 {
fmt.Printf("Added %d FlareSolverr sessions to the pool\n", len(f.sessionPool))
}
}

// If fewer than poolSize sessions were found, create new ones to fill the pool
for len(f.sessionPool) < cap(f.sessionPool) {
f.CreateSession()
}

return nil
}

func (f *FlareSolverr) CreateSession() string {
f.mu.Lock()
defer f.mu.Unlock()

body := map[string]string{"cmd": "sessions.create"}
jsonBody, err := json.Marshal(body)
if err != nil {
return ""
}

req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody))
if err != nil {
return ""
}

req.Header.Set("Content-Type", "application/json")

resp, err := f.httpClient.Do(req)
if err != nil {
return ""
}

defer resp.Body.Close()

var sessionResponse map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&sessionResponse)
if err != nil {
return ""
}

session := sessionResponse["session"].(string)
// Add session to the pool
f.sessionPool <- session

fmt.Println("Created new FlareSolverr session:", session)
return session
}

func (f *FlareSolverr) ListSessions() ([]string, error) {
body := map[string]string{"cmd": "sessions.list"}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, err
}

req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody))
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "application/json")

resp, err := f.httpClient.Do(req)
if err != nil {
return nil, err
}

defer resp.Body.Close()

var sessionsResponse map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&sessionsResponse)
if err != nil {
return nil, err
}

sessions := sessionsResponse["sessions"].([]interface{})
var sessionIDs []string
for _, session := range sessions {
sessionIDs = append(sessionIDs, session.(string))
}

return sessionIDs, nil
}

func (f *FlareSolverr) RetrieveSession() string {
// Blocking receive from the session pool.
session := <-f.sessionPool
return session
}

type Response struct {
Status string `json:"status"`
Message string `json:"message"`
Solution struct {
Url string `json:"url"`
Status int `json:"status"`
Cookies []struct {
Domain string `json:"domain"`
Expiry int `json:"expiry"`
HttpOnly bool `json:"httpOnly"`
Name string `json:"name"`
Path string `json:"path"`
SameSite string `json:"sameSite"`
Secure bool `json:"secure"`
Value string `json:"value"`
} `json:"cookies"`
UserAgent string `json:"userAgent"`
Headers map[string]string `json:"headers"`
Response string `json:"response"`
} `json:"solution"`
}

func (f *FlareSolverr) Get(url string) (io.ReadCloser, error) {
// Check if the FlareSolverr instance was initiated
if !f.initiated {
return io.NopCloser(bytes.NewReader([]byte(""))), nil
}

// Retrieve session from the pool (blocking if no sessions available)
session := f.RetrieveSession()

// Ensure the session is returned to the pool after the request is done
defer func() {
f.sessionPool <- session
}()

body := map[string]string{
"cmd": "request.get",
"url": url,
"maxTimeout": fmt.Sprintf("%d", f.maxTimeout),
"session": session,
}
jsonBody, err := json.Marshal(body)
if err != nil {
return nil, err
}
req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody))
if err != nil {
return nil, err
}

req.Header.Set("Content-Type", "application/json")

resp, err := f.httpClient.Do(req)
if err != nil {
return nil, err
}

// Parse the response
var response Response
err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return nil, err
}

// Check if the response was successful
if response.Status != "ok" {
return nil, fmt.Errorf("failed to get response: %s", response.Message)
}

// Check if "Under attack" is in the response
if strings.Contains(response.Solution.Response, "Under attack") {
return nil, fmt.Errorf("under attack")
}

// Return the response body
return io.NopCloser(bytes.NewReader([]byte(response.Solution.Response))), nil
}
Loading