15
15
package ingest
16
16
17
17
import (
18
+ << << << < HEAD
18
19
"github.com/pingcap/errors"
20
+ == == == =
21
+ "sync"
22
+
23
+ >> >> >> > f7d5db24b3 (ddl / ingest : add mutex to disk root (#41029 ))
19
24
lcom "github.com/pingcap/tidb/br/pkg/lightning/common"
20
25
"github.com/pingcap/tidb/sessionctx/variable"
21
26
"github.com/pingcap/tidb/util/logutil"
@@ -38,6 +43,7 @@ type diskRootImpl struct {
38
43
currentUsage uint64
39
44
maxQuota uint64
40
45
bcCtx * backendCtxManager
46
+ mu sync.RWMutex
41
47
}
42
48
43
49
// NewDiskRootImpl creates a new DiskRoot.
@@ -50,22 +56,32 @@ func NewDiskRootImpl(path string, bcCtx *backendCtxManager) DiskRoot {
50
56
51
57
// CurrentUsage implements DiskRoot interface.
52
58
func (d * diskRootImpl ) CurrentUsage () uint64 {
53
- return d .currentUsage
59
+ d .mu .RLock ()
60
+ usage := d .currentUsage
61
+ d .mu .RUnlock ()
62
+ return usage
54
63
}
55
64
56
65
// MaxQuota implements DiskRoot interface.
57
66
func (d * diskRootImpl ) MaxQuota () uint64 {
58
- return d .maxQuota
67
+ d .mu .RLock ()
68
+ quota := d .maxQuota
69
+ d .mu .RUnlock ()
70
+ return quota
59
71
}
60
72
61
73
// UpdateUsageAndQuota implements DiskRoot interface.
62
74
func (d * diskRootImpl ) UpdateUsageAndQuota () error {
63
- d . currentUsage = d .bcCtx .TotalDiskUsage ()
75
+ totalDiskUsage : = d .bcCtx .TotalDiskUsage ()
64
76
sz , err := lcom .GetStorageSize (d .path )
65
77
if err != nil {
66
78
logutil .BgLogger ().Error (LitErrGetStorageQuota , zap .Error (err ))
67
79
return errors .New (LitErrGetStorageQuota )
68
80
}
69
- d .maxQuota = mathutil .Min (variable .DDLDiskQuota .Load (), uint64 (capacityThreshold * float64 (sz .Capacity )))
81
+ maxQuota := mathutil .Min (variable .DDLDiskQuota .Load (), uint64 (capacityThreshold * float64 (sz .Capacity )))
82
+ d .mu .Lock ()
83
+ d .currentUsage = totalDiskUsage
84
+ d .maxQuota = maxQuota
85
+ d .mu .Unlock ()
70
86
return nil
71
87
}
0 commit comments