-
Notifications
You must be signed in to change notification settings - Fork 3
/
locker.go
141 lines (114 loc) · 3.4 KB
/
locker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
package etcdlock
import (
"context"
"strings"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/etcdserver/api/v3lock/v3lockpb"
"github.com/coreos/etcd/etcdserver/etcdserverpb"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
// Exposed errors.
var (
ErrEmptyKey = errors.New("empty key")
)
const (
defaultEtcdKeyPrefix = "__etcd_lock/"
retryCount = 3
)
// Locker is the client for acquiring distributed locks from etcd. It should be
// created from NewLocker() function.
type Locker struct {
etcdKeyPrefix string
leaseCli etcdserverpb.LeaseClient
kvCli etcdserverpb.KVClient
lockCli v3lockpb.LockClient
}
// LockerOptions is the options for NewLocker() function.
type LockerOptions struct {
// The address of etcd(v3) server.
Address string
// Options used for `grpc.Dial`.
DialOptions []grpc.DialOption
// Prefix of the keys of locks in etcd, by default is "__etcd_lock/"
EtcdKeyPrefix string
}
// NewLocker creates a Locker according to the given options.
func NewLocker(options LockerOptions) (*Locker, error) {
conn, err := grpc.Dial(options.Address, options.DialOptions...)
if err != nil {
return nil, err
}
if options.EtcdKeyPrefix == "" {
options.EtcdKeyPrefix = defaultEtcdKeyPrefix
}
locker := &Locker{
etcdKeyPrefix: options.EtcdKeyPrefix,
leaseCli: etcdserverpb.NewLeaseClient(conn),
kvCli: etcdserverpb.NewKVClient(conn),
lockCli: v3lockpb.NewLockClient(conn),
}
return locker, nil
}
// Lock acquires a distributed lock for the specified resource
// from etcd v3.
func (l *Locker) Lock(ctx context.Context, keyName string, timeout time.Duration) (*Lock, error) {
if keyName == "" {
return nil, errors.WithStack(ErrEmptyKey)
}
var try int
for {
try++
leaseID, err := l.getLease(ctx, timeout)
if err != nil {
return nil, errors.WithStack(err)
}
lockRes, err := l.lockCli.Lock(ctx, &v3lockpb.LockRequest{
Name: l.assembleKeyName(keyName),
Lease: leaseID,
})
if err != nil {
// Retry when the etcd server is too busy to handle transactions.
if try <= retryCount && strings.Contains(err.Error(), "too many requests") {
time.Sleep(time.Millisecond * time.Duration(500) * time.Duration(try))
continue
}
return nil, errors.WithStack(err)
}
return &Lock{locker: l, keyName: lockRes.Key}, nil
}
}
// IsLocked checks whether the specified resource has already been locked.
func (l *Locker) IsLocked(ctx context.Context, keyName string) (bool, error) {
if keyName == "" {
return false, errors.WithStack(ErrEmptyKey)
}
key := l.assembleKeyName(keyName)
end := []byte(clientv3.GetPrefixRangeEnd(string(key)))
rangeRes, err := l.kvCli.Range(ctx, &etcdserverpb.RangeRequest{
Key: key,
RangeEnd: end,
CountOnly: true,
})
if err != nil {
return false, errors.WithStack(err)
}
return rangeRes.Count != 0, nil
}
func (l *Locker) unlock(ctx context.Context, keyName []byte) error {
_, err := l.lockCli.Unlock(ctx, &v3lockpb.UnlockRequest{Key: keyName})
return errors.WithStack(err)
}
func (l *Locker) getLease(ctx context.Context, timeout time.Duration) (int64, error) {
leaseRes, err := l.leaseCli.LeaseGrant(ctx, &etcdserverpb.LeaseGrantRequest{
TTL: int64(timeout.Seconds()),
})
if err != nil {
return 0, errors.WithStack(err)
}
return leaseRes.ID, nil
}
func (l *Locker) assembleKeyName(keyName string) []byte {
return []byte(l.etcdKeyPrefix + keyName)
}