-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.go
288 lines (245 loc) · 9.11 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
package leaselock
import (
"context"
"errors"
"fmt"
"log/slog"
"sync"
"time"
"github.com/google/uuid"
coordinationv1 "k8s.io/api/coordination/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
coordinationclientv1 "k8s.io/client-go/kubernetes/typed/coordination/v1"
"k8s.io/utils/ptr"
)
// Locker implements the Locker interface using the kubernetes Lease resource.
type Locker struct {
// holderIdentity is a unique ID for the client acquiring the lock
holderIdentity string
// LeaseClient is the client used to interact with the Lease resource
leaseClient coordinationclientv1.LeaseInterface
// LeaseDuration is the duration a lock can exist before it can be forcibly acquired by another client
LeaseDuration time.Duration
// goroutineCancel context for canceling goroutine
goroutineCancel context.CancelFunc
// name is the name of the Lease resource. Only one person can use a Lease of the same name at a time.
name string
// retryWait is the duration the Lock function will wait before retrying after failing to acquire the lock
retryWait time.Duration
// A wait group used to ensure the goroutine has exited before attempting the unlock. This prevents any
// risk of a race condition
wg sync.WaitGroup
}
// NewLocker creates a Locker.
func NewLocker(
ctx context.Context,
client kubernetes.Interface,
name string,
holderIdentity string,
namespace string,
) (*Locker, error) {
// Create the Locker
locker := &Locker{
name: name,
// Use the identifiable hostname, but add a random string as this needs to be unique to stop
// other requests from being able to call the unlock function
holderIdentity: holderIdentity + uuid.NewString(),
// Time to wait between requests for the lock.
retryWait: time.Second * 3,
// LeaseDuration is the amount of time before the lease expires. Renewal will take place at half this time.
LeaseDuration: 10 * time.Second,
// Create the Lease client
leaseClient: client.CoordinationV1().Leases(namespace),
}
// Create the Lease if it doesn't exist
_, err := locker.leaseClient.Get(ctx, name, metav1.GetOptions{})
if err != nil {
if !k8serrors.IsNotFound(err) {
return nil, err
}
lease := &coordinationv1.Lease{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: coordinationv1.LeaseSpec{
LeaseTransitions: ptr.To(int32(0)),
},
}
_, err := locker.leaseClient.Create(ctx, lease, metav1.CreateOptions{})
if err != nil {
// If the lease was created by another process after the Get call but before the Create call,
// the error will be "AlreadyExists". In this case, we can simply continue with the existing lease.
if !k8serrors.IsAlreadyExists(err) {
return nil, err
}
}
}
return locker, nil
}
// Lock is responsible for acquiring the lock using the Kubernetes Lease resource. It
// uses a loop (block) to continuously try to acquire the lock until it succeeds. The
// caller must call Unlock to release the lock.
func (l *Locker) Lock(ctx context.Context) error {
// Block until we get a lock
for {
// Get the Lease from Kubernetes
lease, err := l.leaseClient.Get(ctx, l.name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get lease resource for lock: %w", err)
}
// Check if the lock is already held and if it is, check to see if the lock
// has expired. If it has not expired, wait for retryWait before trying again.
if lease.Spec.HolderIdentity != nil {
// Get the renew time from the Lease
if lease.Spec.RenewTime == nil {
// Avoid risk of a panic by returning an error
return errors.New("lease renew time is nil")
}
// LeaseDuration as set via LeaseDuration at initiation
if lease.Spec.LeaseDurationSeconds == nil {
// Avoid risk of a panic by returning an error
return errors.New("lease duration is nil")
}
// Convert the int32 duration in to a time.Duration
leaseDuration := time.Duration(*lease.Spec.LeaseDurationSeconds) * time.Second
// If RenewTime+leaseDuration is after Now then return true
if lease.Spec.RenewTime.Time.Add(leaseDuration).After(time.Now()) {
// The lock is already held and hasn't expired yet. Will
// wait for retryWait before looping to try again.
time.Sleep(l.retryWait)
continue
}
}
// Nobody holds the lock, try and lock it
lease.Spec.HolderIdentity = ptr.To(l.holderIdentity)
// Increment the lease transitions
lease.Spec.LeaseTransitions = ptr.To((*lease.Spec.LeaseTransitions) + 1)
// Set the acquire time
lease.Spec.AcquireTime = &metav1.MicroTime{Time: time.Now()}
// Set the renew time to now
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
// Set the LeaseDuration if it is greater than 0
lease.Spec.LeaseDurationSeconds = ptr.To(int32(l.LeaseDuration.Seconds()))
// Update the Lease
_, err = l.leaseClient.Update(ctx, lease, metav1.UpdateOptions{})
if err == nil {
// We got the lock, break the loop
slog.Debug("lease lock acquired")
break
}
// If the error isn't a conflict then something went wrong
if !k8serrors.IsConflict(err) {
return fmt.Errorf("error trying to update Lease: %w", err)
}
// If it is a conflict, another client beat us to the lock since we
// fetched it. Waiting before trying again.
select {
case <-time.After(l.retryWait):
case <-ctx.Done():
return ctx.Err()
}
}
// Set the goroutine cancel context in the Locker so it can be called by the Unlock function
var goroutineCtx context.Context
goroutineCtx, l.goroutineCancel = context.WithCancel(ctx)
// Storing the waitgroup so we can wait in the unlock function.
l.wg.Add(1)
// Start a goroutine to renew the lock
go func() {
defer l.goroutineCancel()
defer l.wg.Done()
// Create a ticker to renew the lock. We have only just acquired the lock
// so we do not need to renew immediately. We can wait for the first tick.
renewDuration := l.LeaseDuration / 2
timer := time.NewTimer(renewDuration)
for {
// Wait for the next tick or context cancellation
select {
case <-goroutineCtx.Done():
return
case <-timer.C:
// Renew the lock immediately before waiting for the first tick.
// If the context is cancelled then we can stop the goroutine
// as the user has requested this stop.
err := renewLock(goroutineCtx, l)
if err != nil && !errors.Is(err, context.Canceled) {
// Logging here instead of using an error channel as we have returned
// from the broader function already to allow the user to continue.
slog.Error("error renewing lock", "error", err)
return
}
// Reset the timer to half the lease duration
timer.Reset(renewDuration)
}
}
}()
return nil
}
// renewLock is used to renew a lock by updating the lease's renew time if the lock is
// held and the caller is the lock holder.
func renewLock(goroutineCtx context.Context, l *Locker) error {
// Get the Lease
lease, err := l.leaseClient.Get(goroutineCtx, l.name, metav1.GetOptions{})
if err != nil {
return err
}
// Check if the lock is already held
if lease.Spec.HolderIdentity == nil {
return errors.New("lock is no longer held")
}
// Check if we are the lock holder
if *lease.Spec.HolderIdentity != l.holderIdentity {
return errors.New("no longer the lock holder")
}
// Set the renew time to now
lease.Spec.RenewTime = &metav1.MicroTime{Time: time.Now()}
// Update the Lease.
_, err = l.leaseClient.Update(goroutineCtx, lease, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
}
// Unlock function is responsible for releasing the lock held by the client.
func (l *Locker) Unlock() error {
// Stop the lock renewal goroutine
l.goroutineCancel()
// Wait for the renew goroutine to exit. This ensures there is no race condition.
// If an Update had already been called then the cancellation of the context may
// not abort the request.
l.wg.Wait()
// Build a timeout cancel. We do now want to use a context from the main process
// as we always want the unlock to occur at the end of the calling function.
// Adding a timeout prevents locks from being held indefinitely. We use the
// l.leaseDuration as the timeout as this is the maximum time a lock can be held
// anyway, so after this amount of time there is no need trying to remove it.
ctx, cancel := context.WithTimeout(context.Background(), l.LeaseDuration)
defer cancel()
// Get the Lease
lease, err := l.leaseClient.Get(ctx, l.name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("could not get lease resource for lock: %w", err)
}
// The holder has to have a value
if lease.Spec.HolderIdentity == nil {
return errors.New("no lock holder value")
}
// Check if we are the lock holder
if *lease.Spec.HolderIdentity != l.holderIdentity {
return errors.New("not the lock holder")
}
// Clear the holder and the acquire time
lease.Spec.HolderIdentity = nil
lease.Spec.AcquireTime = nil
lease.Spec.LeaseDurationSeconds = nil
lease.Spec.RenewTime = nil
// Update the Lease
_, err = l.leaseClient.Update(ctx, lease, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("unlock: error when trying to update Lease: %w", err)
}
slog.Debug("lease lock released")
return nil
}