Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

downloader: manual .lock remove may lead to race and creation of data files without .torrent #9782

Merged
merged 16 commits into from
Apr 11, 2024
10 changes: 3 additions & 7 deletions erigon-lib/downloader/downloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1007,7 +1007,7 @@ func (d *Downloader) mainLoop(silent bool) error {

switch {
case len(t.PeerConns()) > 0:
d.logger.Debug("[snapshots] Downloading from torrent", "file", t.Name(), "peers", len(t.PeerConns()))
d.logger.Debug("[snapshots] Downloading from BitTorrent", "file", t.Name(), "peers", len(t.PeerConns()))
delete(waiting, t.Name())
d.torrentDownload(t, downloadComplete, sem)
case len(t.WebseedPeerConns()) > 0:
Expand Down Expand Up @@ -2108,7 +2108,7 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
if d.alreadyHaveThisName(name) || !IsSnapNameAllowed(name) {
return nil
}
isProhibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
isProhibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
if err != nil {
return err
}
Expand Down Expand Up @@ -2145,12 +2145,8 @@ func (d *Downloader) AddMagnetLink(ctx context.Context, infoHash metainfo.Hash,
// TOOD: add `d.webseeds.Complete` chan - to prevent race - Discover is also async
// TOOD: maybe run it in goroutine and return channel - to select with p2p

ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name)
ts, ok, err := d.webseeds.DownloadAndSaveTorrentFile(ctx, name)
if ok && err == nil {
ts, err := d.torrentFiles.LoadByPath(filepath.Join(d.SnapDir(), name+".torrent"))
if err != nil {
return
}
_, _, err = addTorrentFile(ctx, ts, d.torrentClient, d.db, d.webseeds)
if err != nil {
return
Expand Down
2 changes: 1 addition & 1 deletion erigon-lib/downloader/downloader_grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ type GrpcServer struct {
}

func (s *GrpcServer) ProhibitNewDownloads(ctx context.Context, req *proto_downloader.ProhibitNewDownloadsRequest) (*emptypb.Empty, error) {
return &emptypb.Empty{}, s.d.torrentFiles.prohibitNewDownloads(req.Type)
return &emptypb.Empty{}, s.d.torrentFiles.ProhibitNewDownloads(req.Type)
}

// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
Expand Down
48 changes: 42 additions & 6 deletions erigon-lib/downloader/torrent_files.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,37 @@ func (tf *TorrentFiles) Create(name string, res []byte) error {

tf.lock.Lock()
defer tf.lock.Unlock()
return tf.create(filepath.Join(tf.dir, name), res)
return tf.create(name, res)
}
func (tf *TorrentFiles) create(torrentFilePath string, res []byte) error {

func (tf *TorrentFiles) CreateIfNotProhibited(name string, res []byte) (ts *torrent.TorrentSpec, prohibited, created bool, err error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prohibited and created results are not used anywhere - are they going to be used in a future PR? if not then should we remove them from the func signature?

also looks like created is always returned as false - seems like we are missing a statement to set created to true and return that in the case of a file getting created

tf.lock.Lock()
defer tf.lock.Unlock()
prohibited, err = tf.newDownloadsAreProhibited(name)
if err != nil {
return nil, false, false, err
}

if !tf.exists(name) && !prohibited {
err = tf.create(name, res)
if err != nil {
return nil, false, false, err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

created is always false.

Does it makes sense to just check lock file exists on the start instead of checking it per file? Like, we started, downloads either prohibited or not, that's it. Could be changed only with restart.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

“ instead of checking it per file? ” - this feature is just introduced by Giulio - to prevent downloading 1 type of file (he working on nee type of file): #9766
Don’t know: maybe it was doable by creating special branch in “erigon-snapshots” repo (maybe not).

}
}

ts, err = tf.load(filepath.Join(tf.dir, name))
if err != nil {
return nil, false, false, err
}
return ts, prohibited, false, nil
}

func (tf *TorrentFiles) create(name string, res []byte) error {
if !strings.HasSuffix(name, ".torrent") {
name += ".torrent"
}
torrentFilePath := filepath.Join(tf.dir, name)

if len(res) == 0 {
return fmt.Errorf("try to write 0 bytes to file: %s", torrentFilePath)
}
Expand Down Expand Up @@ -132,9 +160,13 @@ const ProhibitNewDownloadsFileName = "prohibit_new_downloads.lock"
// Erigon "download once" - means restart/upgrade/downgrade will not download files (and will be fast)
// After "download once" - Erigon will produce and seed new files
// Downloader will able: seed new files (already existing on FS), download uncomplete parts of existing files (if Verify found some bad parts)
func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
func (tf *TorrentFiles) ProhibitNewDownloads(t string) error {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.prohibitNewDownloads(t)
}

func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
// open or create file ProhibitNewDownloadsFileName
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_RDONLY, 0644)
if err != nil {
Expand Down Expand Up @@ -174,9 +206,13 @@ func (tf *TorrentFiles) prohibitNewDownloads(t string) error {
return f.Sync()
}

func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
func (tf *TorrentFiles) NewDownloadsAreProhibited(name string) (bool, error) {
tf.lock.Lock()
defer tf.lock.Unlock()
return tf.newDownloadsAreProhibited(name)
}

func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
f, err := os.OpenFile(filepath.Join(tf.dir, ProhibitNewDownloadsFileName), os.O_CREATE|os.O_APPEND|os.O_RDONLY, 0644)
if err != nil {
return false, err
Expand All @@ -185,11 +221,11 @@ func (tf *TorrentFiles) newDownloadsAreProhibited(name string) (bool, error) {
var prohibitedList []string
torrentListJsonBytes, err := io.ReadAll(f)
if err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: read file: %w", err)
return false, fmt.Errorf("NewDownloadsAreProhibited: read file: %w", err)
}
if len(torrentListJsonBytes) > 0 {
if err := json.Unmarshal(torrentListJsonBytes, &prohibitedList); err != nil {
return false, fmt.Errorf("newDownloadsAreProhibited: unmarshal: %w", err)
return false, fmt.Errorf("NewDownloadsAreProhibited: unmarshal: %w", err)
}
}
for _, p := range prohibitedList {
Expand Down
25 changes: 9 additions & 16 deletions erigon-lib/downloader/webseed.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"sync"

"github.com/anacrolix/torrent"

"github.com/c2h5oh/datasize"
"github.com/ledgerwatch/erigon-lib/chain/snapcfg"
"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -338,7 +337,7 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
}
// check if we need to prohibit new downloads for some files
for name := range manifestResponse {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
prohibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(manifestResponse, name)
}
Expand All @@ -356,7 +355,7 @@ func (d *WebSeeds) constructListsOfFiles(ctx context.Context, httpProviders []*u
}
// check if we need to prohibit new downloads for some files
for name := range response {
prohibited, err := d.torrentFiles.newDownloadsAreProhibited(name)
prohibited, err := d.torrentFiles.NewDownloadsAreProhibited(name)
if prohibited || err != nil {
delete(response, name)
}
Expand Down Expand Up @@ -576,34 +575,28 @@ func (d *WebSeeds) downloadTorrentFilesFromProviders(ctx context.Context, rootDi
return webSeedMap
}

func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (bool, error) {
func (d *WebSeeds) DownloadAndSaveTorrentFile(ctx context.Context, name string) (ts *torrent.TorrentSpec, ok bool, err error) {
urls, ok := d.ByFileName(name)
if !ok {
return false, nil
return nil, false, nil
}
for _, urlStr := range urls {
urlStr += ".torrent"
parsedUrl, err := url.Parse(urlStr)
if err != nil {
d.logger.Log(d.verbosity, "[snapshots] callTorrentHttpProvider parse url", "err", err)
continue
continue // it's ok if some HTTP provider failed - try next one
}
res, err := d.callTorrentHttpProvider(ctx, parsedUrl, name)
if err != nil {
d.logger.Log(d.verbosity, "[snapshots] callTorrentHttpProvider", "name", name, "err", err)
continue
}
if d.torrentFiles.Exists(name) {
continue
}
if err := d.torrentFiles.Create(name, res); err != nil {
d.logger.Log(d.verbosity, "[snapshots] .torrent from webseed rejected", "name", name, "err", err)
continue
continue // it's ok if some HTTP provider failed - try next one
}
return true, nil
ts, _, _, err = d.torrentFiles.CreateIfNotProhibited(name, res)
return ts, ts != nil, err
}

return false, nil
return nil, false, nil
}

func (d *WebSeeds) callTorrentHttpProvider(ctx context.Context, url *url.URL, fileName string) ([]byte, error) {
Expand Down
Loading