Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added command to verify remote manifests from webseeds #9762

Merged
merged 13 commits into from
Mar 24, 2024
2 changes: 1 addition & 1 deletion cmd/capcli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,7 +578,7 @@ func (d *DownloadSnapshots) Run(ctx *Context) error {
if err != nil {
return err
}
downlo, err := downloader.New(ctx, downloaderCfg, dirs, log.Root(), log.LvlInfo, true)
downlo, err := downloader.New(ctx, downloaderCfg, log.Root(), log.LvlInfo, true)
if err != nil {
return err
}
Expand Down
146 changes: 128 additions & 18 deletions cmd/downloader/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"net"
"net/url"
"os"
"path/filepath"
"sort"
"strings"
"time"

Expand Down Expand Up @@ -109,9 +112,18 @@ func init() {
rootCmd.AddCommand(torrentCat)
rootCmd.AddCommand(torrentMagnet)

withDataDir(torrentClean)
rootCmd.AddCommand(torrentClean)

withDataDir(manifestCmd)
withChainFlag(manifestCmd)
rootCmd.AddCommand(manifestCmd)

manifestVerifyCmd.Flags().StringVar(&webseeds, utils.WebSeedsFlag.Name, utils.WebSeedsFlag.Value, utils.WebSeedsFlag.Usage)
manifestVerifyCmd.PersistentFlags().BoolVar(&verifyFailfast, "verify.failfast", false, "Stop on first found error. Report it and exit")
withChainFlag(manifestVerifyCmd)
rootCmd.AddCommand(manifestVerifyCmd)

withDataDir(printTorrentHashes)
withChainFlag(printTorrentHashes)
printTorrentHashes.PersistentFlags().BoolVar(&forceRebuild, "rebuild", false, "Force re-create .torrent files")
Expand Down Expand Up @@ -216,7 +228,7 @@ func Downloader(ctx context.Context, logger log.Logger) error {

cfg.AddTorrentsFromDisk = true // always true unless using uploader - which wants control of torrent files

d, err := downloader.New(ctx, cfg, dirs, logger, log.LvlInfo, seedbox)
d, err := downloader.New(ctx, cfg, logger, log.LvlInfo, seedbox)
if err != nil {
return err
}
Expand Down Expand Up @@ -276,7 +288,7 @@ var printTorrentHashes = &cobra.Command{

var manifestCmd = &cobra.Command{
Use: "manifest",
Example: "go run ./cmd/downloader torrent_hashes --datadir <your_datadir>",
Example: "go run ./cmd/downloader manifest --datadir <your_datadir>",
RunE: func(cmd *cobra.Command, args []string) error {
logger := debug.SetupCobra(cmd, "downloader")
if err := manifest(cmd.Context(), logger); err != nil {
Expand All @@ -286,6 +298,18 @@ var manifestCmd = &cobra.Command{
},
}

var manifestVerifyCmd = &cobra.Command{
Use: "manifest-verify",
Example: "go run ./cmd/downloader manifest-verify --chain <chain> [--webseeds 'a','b','c']",
RunE: func(cmd *cobra.Command, args []string) error {
logger := debug.SetupCobra(cmd, "downloader")
if err := manifestVerify(cmd.Context(), logger); err != nil {
log.Error(err.Error())
}
return nil
},
}

var torrentCat = &cobra.Command{
Use: "torrent_cat",
Example: "go run ./cmd/downloader torrent_cat <path_to_torrent_file>",
Expand All @@ -308,6 +332,44 @@ var torrentCat = &cobra.Command{
return nil
},
}
var torrentClean = &cobra.Command{
Use: "torrent_clean",
Short: "Remove all .torrent files from datadir directory",
Example: "go run ./cmd/downloader torrent_clean --datadir=<datadir>",
RunE: func(cmd *cobra.Command, args []string) error {
dirs := datadir.New(datadirCli)

logger.Info("[snapshots.webseed] processing local file etags")
removedTorrents := 0
walker := func(path string, de fs.DirEntry, err error) error {
if err != nil || de.IsDir() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think you can get de nil ptr here - because err is not nil

if err != nil {
logger.Warn("[snapshots.torrent] walk and cleanup", "err", err, "path", path)
}
return nil //nolint
}

if !strings.HasSuffix(de.Name(), ".torrent") || strings.HasPrefix(de.Name(), ".") {
return nil
}
err = os.Remove(filepath.Join(dirs.Snap, path))
if err != nil {
logger.Warn("[snapshots.torrent] remove", "err", err, "path", path)
return err
}
removedTorrents++
return nil
}

sfs := os.DirFS(dirs.Snap)
if err := fs.WalkDir(sfs, ".", walker); err != nil {
return err
}
logger.Info("[snapshots.torrent] cleanup finished", "count", removedTorrents)
return nil
},
}

var torrentMagnet = &cobra.Command{
Use: "torrent_magnet",
Example: "go run ./cmd/downloader torrent_magnet <path_to_torrent_file>",
Expand All @@ -325,49 +387,97 @@ var torrentMagnet = &cobra.Command{
},
}

func manifestVerify(ctx context.Context, logger log.Logger) error {
webseedsList := common.CliString2Array(webseeds)
if known, ok := snapcfg.KnownWebseeds[chain]; ok {
webseedsList = append(webseedsList, known...)
}

webseedUrlsOrFiles := webseedsList
webseedHttpProviders := make([]*url.URL, 0, len(webseedUrlsOrFiles))
webseedFileProviders := make([]string, 0, len(webseedUrlsOrFiles))
for _, webseed := range webseedUrlsOrFiles {
if !strings.HasPrefix(webseed, "v") { // has marker v1/v2/...
uri, err := url.ParseRequestURI(webseed)
if err != nil {
if strings.HasSuffix(webseed, ".toml") && dir.FileExist(webseed) {
webseedFileProviders = append(webseedFileProviders, webseed)
}
continue
}
webseedHttpProviders = append(webseedHttpProviders, uri)
continue
}

if strings.HasPrefix(webseed, "v1:") {
withoutVerisonPrefix := webseed[3:]
if !strings.HasPrefix(withoutVerisonPrefix, "https:") {
continue
}
uri, err := url.ParseRequestURI(withoutVerisonPrefix)
if err != nil {
log.Warn("[webseed] can't parse url", "err", err, "url", withoutVerisonPrefix)
continue
}
webseedHttpProviders = append(webseedHttpProviders, uri)
} else {
continue
}
}

_ = webseedFileProviders // todo add support of file providers
logger.Warn("file providers are not supported yet", "fileProviders", webseedFileProviders)

wseed := downloader.NewWebSeeds(webseedHttpProviders, log.LvlDebug, logger)
return wseed.VerifyManifestedBuckets(ctx, verifyFailfast)
}

func manifest(ctx context.Context, logger log.Logger) error {
dirs := datadir.New(datadirCli)

files, err := downloader.SeedableFiles(dirs, chain)
if err != nil {
return err
}

extList := []string{
".torrent",
".seg", ".idx", // e2
".kv", ".kvi", ".bt", ".kvei", // e3 domain
".v", ".vi", //e3 hist
".ef", ".efi", //e3 idx
".txt", //salt.txt
//".seg", ".idx", // e2
//".kv", ".kvi", ".bt", ".kvei", // e3 domain
//".v", ".vi", //e3 hist
//".ef", ".efi", //e3 idx
".txt", //salt.txt, manifest.txt
}
l, _ := dir.ListFiles(dirs.Snap, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
fmt.Printf("%s\n", fName)
files = append(files, fName)
}
l, _ = dir.ListFiles(dirs.SnapDomain, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
fmt.Printf("domain/%s\n", fName)
files = append(files, "domain/"+fName)
}
l, _ = dir.ListFiles(dirs.SnapHistory, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
if strings.Contains(fName, "commitment") {
continue
}
fmt.Printf("history/%s\n", fName)
files = append(files, "history/"+fName)
}
l, _ = dir.ListFiles(dirs.SnapIdx, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
if strings.Contains(fName, "commitment") {
continue
}
fmt.Printf("idx/%s\n", fName)
files = append(files, "idx/"+fName)
}
l, _ = dir.ListFiles(dirs.SnapAccessors, extList...)
for _, fPath := range l {
_, fName := filepath.Split(fPath)
if strings.Contains(fName, "commitment") {
continue
}
fmt.Printf("accessors/%s\n", fName)

sort.Strings(files)
for _, f := range files {
fmt.Printf("%s\n", f)
}
return nil
}
Expand Down
8 changes: 8 additions & 0 deletions cmd/downloader/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@ downloader --datadir=<your> --chain=mainnet --webseed=<webseed_url>
downloader torrent_cat /path/to.torrent

downloader torrent_magnet /path/to.torrent

downloader torrent_clean --datadir <datadir> # remote all .torrent files in datadir
```

## Remote manifest verify
To check that remote webseeds has available manifest and all manifested files are available, has correct format of ETag, does not have dangling torrents etc.
```
downloader manifest-verify --chain <chain> [--webseeds 'a','b','c']
```

## Faster rsync
Expand Down
58 changes: 14 additions & 44 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type Downloader struct {
type webDownloadInfo struct {
url *url.URL
length int64
md5 string
torrent *torrent.Torrent
}

Expand All @@ -118,7 +119,7 @@ type AggStats struct {
LocalFileHashTime time.Duration
}

func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger log.Logger, verbosity log.Lvl, discover bool) (*Downloader, error) {
func New(ctx context.Context, cfg *downloadercfg.Cfg, logger log.Logger, verbosity log.Lvl, discover bool) (*Downloader, error) {
db, c, m, torrentClient, err := openClient(ctx, cfg.Dirs.Downloader, cfg.Dirs.Snap, cfg.ClientConfig)
if err != nil {
return nil, fmt.Errorf("openClient: %w", err)
Expand Down Expand Up @@ -151,7 +152,7 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
torrentClient: torrentClient,
lock: mutex,
stats: stats,
webseeds: &WebSeeds{logger: logger, verbosity: verbosity, downloadTorrentFile: cfg.DownloadTorrentFilesFromWebseed, torrentsWhitelist: lock.Downloads},
webseeds: NewWebSeeds(cfg.WebSeedUrls, verbosity, logger),
logger: logger,
verbosity: verbosity,
torrentFiles: &TorrentFiles{dir: cfg.Dirs.Snap},
Expand All @@ -161,13 +162,13 @@ func New(ctx context.Context, cfg *downloadercfg.Cfg, dirs datadir.Dirs, logger
downloading: map[string]struct{}{},
webseedsDiscover: discover,
}
d.webseeds.SetTorrent(d.torrentFiles, lock.Downloads, cfg.DownloadTorrentFilesFromWebseed)

if cfg.ClientConfig.DownloadRateLimiter != nil {
downloadLimit := cfg.ClientConfig.DownloadRateLimiter.Limit()
d.downloadLimit = &downloadLimit
}

d.webseeds.torrentFiles = d.torrentFiles
d.ctx, d.stopMainLoop = context.WithCancel(ctx)

if cfg.AddTorrentsFromDisk {
Expand Down Expand Up @@ -342,7 +343,7 @@ func initSnapshotLock(ctx context.Context, cfg *downloadercfg.Cfg, db kv.RoDB, s
Chain: cfg.ChainName,
}

files, err := seedableFiles(cfg.Dirs, cfg.ChainName)
files, err := SeedableFiles(cfg.Dirs, cfg.ChainName)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -656,7 +657,8 @@ func (d *Downloader) mainLoop(silent bool) error {
d.wg.Add(1)
go func() {
defer d.wg.Done()
d.webseeds.Discover(d.ctx, d.cfg.WebSeedUrls, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// webseeds.Discover may create new .torrent files on disk
d.webseeds.Discover(d.ctx, d.cfg.WebSeedFiles, d.cfg.Dirs.Snap)
// apply webseeds to existing torrents
if err := d.addTorrentFilesFromDisk(true); err != nil && !errors.Is(err, context.Canceled) {
d.logger.Warn("[snapshots] addTorrentFilesFromDisk", "err", err)
Expand Down Expand Up @@ -1272,8 +1274,6 @@ func (d *Downloader) checkComplete(name string) (bool, int64, *time.Time) {
}

func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []*seedHash, error) {
torrentHash := t.InfoHash()

d.lock.RLock()
info, ok := d.webDownloadInfo[t.Name()]
d.lock.RUnlock()
Expand All @@ -1282,46 +1282,16 @@ func (d *Downloader) getWebDownloadInfo(t *torrent.Torrent) (webDownloadInfo, []
return info, nil, nil
}

seedHashMismatches := make([]*seedHash, 0, len(d.cfg.WebSeedUrls))

for _, webseed := range d.cfg.WebSeedUrls {
downloadUrl := webseed.JoinPath(t.Name())

if headRequest, err := http.NewRequestWithContext(d.ctx, "HEAD", downloadUrl.String(), nil); err == nil {
headResponse, err := http.DefaultClient.Do(headRequest)

if err != nil {
continue
}

headResponse.Body.Close()

if headResponse.StatusCode == http.StatusOK {
if meta, err := getWebpeerTorrentInfo(d.ctx, downloadUrl); err == nil {
if bytes.Equal(torrentHash.Bytes(), meta.HashInfoBytes().Bytes()) {
// TODO check the torrent's hash matches this hash
return webDownloadInfo{
url: downloadUrl,
length: headResponse.ContentLength,
torrent: t,
}, seedHashMismatches, nil
} else {
hash := meta.HashInfoBytes()
seedHashMismatches = append(seedHashMismatches, &seedHash{url: webseed, hash: &hash})
continue
}
}
}
}

seedHashMismatches = append(seedHashMismatches, &seedHash{url: webseed})
// todo this function does not exit on first matched webseed hash, could make unexpected results
infos, seedHashMismatches, err := d.webseeds.getWebDownloadInfo(d.ctx, t)
if err != nil || len(infos) == 0 {
return webDownloadInfo{}, seedHashMismatches, fmt.Errorf("can't find download info: %w", err)
}

return webDownloadInfo{}, seedHashMismatches, fmt.Errorf("can't find download info")
return infos[0], seedHashMismatches, nil
}

func getWebpeerTorrentInfo(ctx context.Context, downloadUrl *url.URL) (*metainfo.MetaInfo, error) {
torrentRequest, err := http.NewRequestWithContext(ctx, "GET", downloadUrl.String()+".torrent", nil)
torrentRequest, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadUrl.String()+".torrent", nil)

if err != nil {
return nil, err
Expand Down Expand Up @@ -2215,7 +2185,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
return nil
}

func seedableFiles(dirs datadir.Dirs, chainName string) ([]string, error) {
func SeedableFiles(dirs datadir.Dirs, chainName string) ([]string, error) {
files, err := seedableSegmentFiles(dirs.Snap, chainName)
if err != nil {
return nil, fmt.Errorf("seedableSegmentFiles: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestChangeInfoHashOfSameFile(t *testing.T) {
dirs := datadir.New(t.TempDir())
cfg, err := downloadercfg2.New(dirs, "", lg.Info, 0, 0, 0, 0, 0, nil, nil, "testnet", false)
require.NoError(err)
d, err := New(context.Background(), cfg, dirs, log.New(), log.LvlInfo, true)
d, err := New(context.Background(), cfg, log.New(), log.LvlInfo, true)
require.NoError(err)
defer d.Close()
err = d.AddMagnetLink(d.ctx, snaptype.Hex2InfoHash("aa"), "a.seg")
Expand Down
Loading
Loading