diff --git a/api/bludv.go b/api/bludv.go index e46d613..fee2368 100644 --- a/api/bludv.go +++ b/api/bludv.go @@ -44,7 +44,7 @@ func (i *Indexer) HandlerBluDVIndexer(w http.ResponseWriter, r *http.Request) { } fmt.Println("URL:>", url) - resp, err := http.Get(url) + resp, err := i.requester.GetDocument(ctx, url) if err != nil { w.WriteHeader(http.StatusInternalServerError) err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) @@ -54,9 +54,9 @@ func (i *Indexer) HandlerBluDVIndexer(w http.ResponseWriter, r *http.Request) { i.metrics.IndexerErrors.WithLabelValues("bludv").Inc() return } - defer resp.Body.Close() + defer resp.Close() - doc, err := goquery.NewDocumentFromReader(resp.Body) + doc, err := goquery.NewDocumentFromReader(resp) if err != nil { w.WriteHeader(http.StatusInternalServerError) err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) diff --git a/api/comando_torrents.go b/api/comando_torrents.go index 2b49858..03ecaed 100644 --- a/api/comando_torrents.go +++ b/api/comando_torrents.go @@ -60,7 +60,7 @@ func (i *Indexer) HandlerComandoIndexer(w http.ResponseWriter, r *http.Request) } fmt.Println("URL:>", url) - resp, err := http.Get(url) + resp, err := i.requester.GetDocument(ctx, url) if err != nil { w.WriteHeader(http.StatusInternalServerError) err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) @@ -70,9 +70,9 @@ func (i *Indexer) HandlerComandoIndexer(w http.ResponseWriter, r *http.Request) i.metrics.IndexerErrors.WithLabelValues("comando").Inc() return } - defer resp.Body.Close() + defer resp.Close() - doc, err := goquery.NewDocumentFromReader(resp.Body) + doc, err := goquery.NewDocumentFromReader(resp) if err != nil { w.WriteHeader(http.StatusInternalServerError) err = json.NewEncoder(w).Encode(map[string]string{"error": err.Error()}) @@ -405,17 +405,18 @@ func getDocument(ctx context.Context, i *Indexer, link string) (*goquery.Documen docCache, err := i.redis.Get(ctx, link) if err == nil { i.metrics.CacheHits.WithLabelValues("document_body").Inc() + fmt.Printf("returning from long-lived cache: %s\n", link) return goquery.NewDocumentFromReader(io.NopCloser(bytes.NewReader(docCache))) } defer i.metrics.CacheMisses.WithLabelValues("document_body").Inc() - resp, err := http.Get(link) + resp, err := i.requester.GetDocument(ctx, link) if err != nil { return nil, err } - defer resp.Body.Close() + defer resp.Close() - body, err := io.ReadAll(resp.Body) + body, err := io.ReadAll(resp) if err != nil { return nil, err } diff --git a/api/index.go b/api/index.go index 3b84c8a..23da49d 100644 --- a/api/index.go +++ b/api/index.go @@ -7,12 +7,14 @@ import ( "github.com/felipemarinho97/torrent-indexer/cache" "github.com/felipemarinho97/torrent-indexer/monitoring" + "github.com/felipemarinho97/torrent-indexer/requester" "github.com/felipemarinho97/torrent-indexer/schema" ) type Indexer struct { - redis *cache.Redis - metrics *monitoring.Metrics + redis *cache.Redis + metrics *monitoring.Metrics + requester *requester.Requster } type IndexerMeta struct { @@ -42,10 +44,11 @@ type IndexedTorrent struct { Similarity float32 `json:"similarity"` } -func NewIndexers(redis *cache.Redis, metrics *monitoring.Metrics) *Indexer { +func NewIndexers(redis *cache.Redis, metrics *monitoring.Metrics, req *requester.Requster) *Indexer { return &Indexer{ - redis: redis, - metrics: metrics, + redis: redis, + metrics: metrics, + requester: req, } } diff --git a/cache/redis.go b/cache/redis.go index fc02e67..85736e9 100644 --- a/cache/redis.go +++ b/cache/redis.go @@ -10,7 +10,7 @@ import ( ) var ( - DefaultExpiration = 24 * time.Hour * 180 // 180 days + DefaultExpiration = 24 * time.Hour * 7 // 7 days IndexerComandoTorrents = "indexer:comando_torrents" ) diff --git a/docker-compose.yml b/docker-compose.yml index 5238dcb..04a90c0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,6 +11,7 @@ services: - indexer environment: - REDIS_HOST=redis + - FLARESOLVERR_ADDRESS=http://flaresolverr:8191 redis: image: redis:alpine diff --git a/main.go b/main.go index 2efda33..5af4584 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,14 @@ package main import ( + "fmt" "net/http" + "os" handler "github.com/felipemarinho97/torrent-indexer/api" "github.com/felipemarinho97/torrent-indexer/cache" "github.com/felipemarinho97/torrent-indexer/monitoring" + "github.com/felipemarinho97/torrent-indexer/requester" "github.com/prometheus/client_golang/prometheus/promhttp" ) @@ -13,7 +16,10 @@ func main() { redis := cache.NewRedis() metrics := monitoring.NewMetrics() metrics.Register() - indexers := handler.NewIndexers(redis, metrics) + + flaresolverr := requester.NewFlareSolverr(os.Getenv("FLARESOLVERR_ADDRESS"), 60000) + req := requester.NewRequester(flaresolverr, redis) + indexers := handler.NewIndexers(redis, metrics, req) indexerMux := http.NewServeMux() metricsMux := http.NewServeMux() @@ -31,7 +37,7 @@ func main() { panic(err) } }() - + fmt.Println("Server listening on :7006") err := http.ListenAndServe(":7006", indexerMux) if err != nil { panic(err) diff --git a/requester/flaresolverr.go b/requester/flaresolverr.go new file mode 100644 index 0000000..1f1839f --- /dev/null +++ b/requester/flaresolverr.go @@ -0,0 +1,230 @@ +package requester + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "sync" +) + +type FlareSolverr struct { + url string + maxTimeout int + httpClient *http.Client + sessionPool chan string + mu sync.Mutex + initiated bool +} + +func NewFlareSolverr(url string, timeoutMilli int) *FlareSolverr { + poolSize := 5 + httpClient := &http.Client{} + sessionPool := make(chan string, poolSize) // Pool size of 5 sessions + + f := &FlareSolverr{ + url: url, + maxTimeout: timeoutMilli, + httpClient: httpClient, + sessionPool: sessionPool, + } + + err := f.FillSessionPool() + if err == nil { + f.initiated = true + } + + return f +} + +func (f *FlareSolverr) FillSessionPool() error { + // Check if the pool is already filled + if len(f.sessionPool) == cap(f.sessionPool) { + return nil + } + + // Pre-initialize the pool with existing sessions + sessions, err := f.ListSessions() + if err != nil { + fmt.Println("Failed to list existing FlareSolverr sessions:", err) + return err + } else { + for _, session := range sessions { + // Add available sessions to the pool + if len(f.sessionPool) < cap(f.sessionPool) { + f.sessionPool <- session + } + } + if len(f.sessionPool) > 0 { + fmt.Printf("Added %d FlareSolverr sessions to the pool\n", len(f.sessionPool)) + } + } + + // If fewer than poolSize sessions were found, create new ones to fill the pool + for len(f.sessionPool) < cap(f.sessionPool) { + f.CreateSession() + } + + return nil +} + +func (f *FlareSolverr) CreateSession() string { + f.mu.Lock() + defer f.mu.Unlock() + + body := map[string]string{"cmd": "sessions.create"} + jsonBody, err := json.Marshal(body) + if err != nil { + return "" + } + + req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody)) + if err != nil { + return "" + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := f.httpClient.Do(req) + if err != nil { + return "" + } + + defer resp.Body.Close() + + var sessionResponse map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&sessionResponse) + if err != nil { + return "" + } + + session := sessionResponse["session"].(string) + // Add session to the pool + f.sessionPool <- session + + fmt.Println("Created new FlareSolverr session:", session) + return session +} + +func (f *FlareSolverr) ListSessions() ([]string, error) { + body := map[string]string{"cmd": "sessions.list"} + jsonBody, err := json.Marshal(body) + if err != nil { + return nil, err + } + + req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := f.httpClient.Do(req) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + + var sessionsResponse map[string]interface{} + err = json.NewDecoder(resp.Body).Decode(&sessionsResponse) + if err != nil { + return nil, err + } + + sessions := sessionsResponse["sessions"].([]interface{}) + var sessionIDs []string + for _, session := range sessions { + sessionIDs = append(sessionIDs, session.(string)) + } + + return sessionIDs, nil +} + +func (f *FlareSolverr) RetrieveSession() string { + // Blocking receive from the session pool. + session := <-f.sessionPool + return session +} + +type Response struct { + Status string `json:"status"` + Message string `json:"message"` + Solution struct { + Url string `json:"url"` + Status int `json:"status"` + Cookies []struct { + Domain string `json:"domain"` + Expiry int `json:"expiry"` + HttpOnly bool `json:"httpOnly"` + Name string `json:"name"` + Path string `json:"path"` + SameSite string `json:"sameSite"` + Secure bool `json:"secure"` + Value string `json:"value"` + } `json:"cookies"` + UserAgent string `json:"userAgent"` + Headers map[string]string `json:"headers"` + Response string `json:"response"` + } `json:"solution"` +} + +func (f *FlareSolverr) Get(url string) (io.ReadCloser, error) { + // Check if the FlareSolverr instance was initiated + if !f.initiated { + return io.NopCloser(bytes.NewReader([]byte(""))), nil + } + + // Retrieve session from the pool (blocking if no sessions available) + session := f.RetrieveSession() + + // Ensure the session is returned to the pool after the request is done + defer func() { + f.sessionPool <- session + }() + + body := map[string]string{ + "cmd": "request.get", + "url": url, + "maxTimeout": fmt.Sprintf("%d", f.maxTimeout), + "session": session, + } + jsonBody, err := json.Marshal(body) + if err != nil { + return nil, err + } + req, err := http.NewRequest("POST", fmt.Sprintf("%s/v1", f.url), bytes.NewBuffer(jsonBody)) + if err != nil { + return nil, err + } + + req.Header.Set("Content-Type", "application/json") + + resp, err := f.httpClient.Do(req) + if err != nil { + return nil, err + } + + // Parse the response + var response Response + err = json.NewDecoder(resp.Body).Decode(&response) + if err != nil { + return nil, err + } + + // Check if the response was successful + if response.Status != "ok" { + return nil, fmt.Errorf("failed to get response: %s", response.Message) + } + + // Check if "Under attack" is in the response + if strings.Contains(response.Solution.Response, "Under attack") { + return nil, fmt.Errorf("under attack") + } + + // Return the response body + return io.NopCloser(bytes.NewReader([]byte(response.Solution.Response))), nil +} diff --git a/requester/requester.go b/requester/requester.go new file mode 100644 index 0000000..f08ea2a --- /dev/null +++ b/requester/requester.go @@ -0,0 +1,93 @@ +package requester + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "regexp" + "time" + + "github.com/felipemarinho97/torrent-indexer/cache" +) + +const ( + shortLivedCacheExpiration = 30 * time.Minute + cacheKey = "shortLivedCache" +) + +var challangeRegex = regexp.MustCompile(`(?i)(just a moment|cf-chl-bypass|under attack)`) + +type Requster struct { + fs *FlareSolverr + c *cache.Redis + httpClient *http.Client +} + +func NewRequester(fs *FlareSolverr, c *cache.Redis) *Requster { + return &Requster{fs: fs, httpClient: &http.Client{}, c: c} +} + +func (i *Requster) GetDocument(ctx context.Context, url string) (io.ReadCloser, error) { + var body io.ReadCloser + + // try request from short-lived cache + key := fmt.Sprintf("%s:%s", cacheKey, url) + bodyByte, err := i.c.Get(ctx, key) + if err == nil { + fmt.Printf("returning from short-lived cache: %s\n", url) + body = io.NopCloser(bytes.NewReader(bodyByte)) + return body, nil + } + + // try request with plain client + resp, err := i.httpClient.Get(url) + if err != nil { + // try request with flare solverr + body, err = i.fs.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to do request for url %s: %w", url, err) + } + } else { + defer resp.Body.Close() + body = resp.Body + } + + bodyByte, err = io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + if hasChallange(bodyByte) { + // try request with flare solverr + body, err = i.fs.Get(url) + if err != nil { + return nil, fmt.Errorf("failed to do request for url %s: %w", url, err) + } + bodyByte, err = io.ReadAll(body) + if err != nil { + return nil, fmt.Errorf("failed to read response body: %w", err) + } + fmt.Printf("request served from flaresolverr: %s\n", url) + } else { + fmt.Printf("request served from plain client: %s\n", url) + } + + // save response to cache if it's not a challange and body is not empty + if !hasChallange(bodyByte) && len(bodyByte) > 0 { + err = i.c.SetWithExpiration(ctx, key, bodyByte, shortLivedCacheExpiration) + if err != nil { + fmt.Printf("failed to save response to cache: %v\n", err) + } + fmt.Printf("saved to cache: %s\n", url) + } else { + return nil, fmt.Errorf("response is a challange") + } + + return io.NopCloser(bytes.NewReader(bodyByte)), nil +} + +// hasChallange checks if the body contains a challange by regex matching +func hasChallange(body []byte) bool { + return challangeRegex.Match(body) +} diff --git a/scrape/info.go b/scrape/info.go index 57ae3f2..b01e031 100644 --- a/scrape/info.go +++ b/scrape/info.go @@ -52,7 +52,7 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, m *monitoring.Metric fmt.Println("unable to get peers from cache for infohash:", infoHash) } else { m.CacheMisses.WithLabelValues("peers").Inc() - fmt.Println("get from cache> leech:", leech, "seed:", seed) + fmt.Println("hash:", infoHash, "get from cache -> leech:", leech, "seed:", seed) return leech, seed, nil } @@ -87,16 +87,18 @@ func GetLeechsAndSeeds(ctx context.Context, r *cache.Redis, m *monitoring.Metric var peer peers for i := 0; i < len(trackers); i++ { select { + case <-errChan: + // discard error case peer = <-peerChan: err = setPeersToCache(ctx, r, infoHash, peer.Leechers, peer.Seeders) if err != nil { fmt.Println(err) + } else { + fmt.Println("hash:", infoHash, "get from tracker -> leech:", peer.Leechers, "seed:", peer.Seeders) } return peer.Leechers, peer.Seeders, nil - case err := <-errChan: - fmt.Println(err) } } - return 0, 0, fmt.Errorf("unable to get peers from trackers") + return 0, 0, fmt.Errorf("unable to get peers from trackers for infohash: %s", infoHash) }