-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathjob_upload.go
113 lines (106 loc) · 2.43 KB
/
job_upload.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package putiosync
import (
"context"
"fmt"
"io"
"os"
"path"
"github.com/cenkalti/log"
"github.com/putdotio/putio-sync/v2/internal/inode"
"github.com/putdotio/putio-sync/v2/internal/progress"
"github.com/putdotio/putio-sync/v2/internal/watcher"
)
type uploadJob struct {
localFile iLocalFile
state *stateType
}
func (d *uploadJob) String() string {
return fmt.Sprintf("Uploading %q", d.localFile.RelPath())
}
func (d *uploadJob) tryResume(ctx context.Context) bool {
if d.state == nil {
return false
}
if d.state.Status != statusUploading {
return false
}
if d.state.UploadURL == "" {
return false
}
if d.state.Size != d.localFile.Info().Size() {
return false
}
in, _ := inode.Get(d.localFile.FullPath(), d.localFile.Info())
if d.state.LocalInode != in {
return false
}
offset, err := client.Upload.GetOffset(ctx, d.state.UploadURL)
if err != nil {
return false
}
d.state.Offset = offset
return offset <= d.localFile.Info().Size()
}
func (d *uploadJob) Run(ctx context.Context) error {
modwatch, err := watcher.WatchFileModification(ctx, d.localFile.FullPath())
if err != nil {
return err
}
defer modwatch.Stop()
ok := d.tryResume(ctx)
if !ok {
in, err := inode.Get(d.localFile.FullPath(), d.localFile.Info())
if err != nil {
return err
}
dir, filename := path.Split(d.localFile.RelPath())
parentID, err := dirCache.Mkdirp(ctx, dir)
if err != nil {
return err
}
location, err := client.Upload.CreateUpload(ctx, filename, parentID, d.localFile.Info().Size(), true)
if err != nil {
return err
}
d.state = &stateType{
Status: statusUploading,
LocalInode: in,
UploadURL: location,
Size: d.localFile.Info().Size(),
relpath: d.localFile.RelPath(),
}
err = d.state.Write()
if err != nil {
return err
}
}
f, err := os.Open(d.localFile.FullPath())
if err != nil {
return err
}
defer f.Close()
_, err = f.Seek(d.state.Offset, io.SeekStart)
if err != nil {
return err
}
pr := progress.New(f, d.state.Offset, d.state.Size, d.String())
pr.Start()
fileID, crc32, err := client.Upload.SendFile(modwatch.Context(), pr, d.state.UploadURL, d.state.Offset)
pr.Stop()
modified := modwatch.Stop()
if modified {
log.Warningln("File modified while uploading")
return nil
}
if err != nil {
return err
}
d.state.Status = statusSynced
d.state.RemoteID = fileID
d.state.CRC32 = crc32
err = d.state.Write()
if err != nil {
return err
}
return nil
}