-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathretry.go
314 lines (278 loc) · 9.07 KB
/
retry.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
package transport
import (
"bytes"
"context"
"io"
"math/rand"
"net/http"
"time"
)
// Requests contain mutable state that is altered on each pass through a
// Transport. In several ways, the state is mutated to the point that it cannot
// be reused. This component was introduced to account for cases where the
// request body was drained partially or completely but we want to re-issue the
// request.
type requestCopier struct {
original *http.Request
body []byte
}
func newRequestCopier(r *http.Request) (*requestCopier, error) {
var body []byte
var e error
if r.Body != nil {
body, e = io.ReadAll(r.Body)
}
// Setting the request body to nil after capturing it so that it is not
// included in the deep copy. This code already manages copying the
// content body.
r.Body = nil
return &requestCopier{original: r, body: body}, e
}
func (r *requestCopier) Copy() *http.Request {
var newRequest = r.original.Clone(r.original.Context())
newRequest.Body = nil
if r.body != nil {
newRequest.Body = io.NopCloser(bytes.NewBuffer(r.body))
newRequest.GetBody = func() (io.ReadCloser, error) {
return io.NopCloser(bytes.NewBuffer(r.body)), nil
}
}
return newRequest
}
// Retrier determines whether or not the transport will automatically retry
// a request.
type Retrier interface {
Retry(*http.Request, *http.Response, error) bool
}
// Requester can be implemented if the Retrier needs to manipulate the request
// or request context before it is executed.
type Requester interface {
Request(*http.Request) *http.Request
}
// RetryPolicy is a factory that generates a Retrier.
type RetryPolicy func() Retrier
// Backoffer determines how much time to wait in between automated retires.
type Backoffer interface {
Backoff(*http.Request, *http.Response, error) time.Duration
}
// BackoffPolicy is a factory that generates a Backoffer.
type BackoffPolicy func() Backoffer
// LimitedRetrier wraps a series of retry policies in a hard upper limit.
type LimitedRetrier struct {
limit int
attempts int
retries []Retrier
}
// NewLimitedRetryPolicy wraps a series of retry policies in an upper limit.
func NewLimitedRetryPolicy(limit int, policies ...RetryPolicy) RetryPolicy {
return func() Retrier {
var retries = make([]Retrier, 0, len(policies))
for _, policy := range policies {
retries = append(retries, policy())
}
return &LimitedRetrier{
limit: limit,
attempts: 0,
retries: retries,
}
}
}
// Request implements Requester by calling the wrapped Request methods where
// needed.
func (r *LimitedRetrier) Request(req *http.Request) *http.Request {
for _, retry := range r.retries {
if requester, ok := retry.(Requester); ok {
req = requester.Request(req)
}
}
return req
}
// Retry the request based on the wrapped policies until the limit is reached.
// Once the limit is reached then this method always returns false.
func (r *LimitedRetrier) Retry(req *http.Request, resp *http.Response, e error) bool {
if r.attempts >= r.limit {
return false
}
r.attempts = r.attempts + 1
for _, retry := range r.retries {
if retry.Retry(req, resp, e) {
return true
}
}
return false
}
// StatusCodeRetrier retries based on HTTP status codes.
type StatusCodeRetrier struct {
codes []int
}
// Retry the request if the response has a valid code that matches on of the
// codes given in the retry set.
func (r *StatusCodeRetrier) Retry(req *http.Request, resp *http.Response, e error) bool {
for _, code := range r.codes {
if resp != nil && resp.StatusCode == code {
return true
}
}
return false
}
// NewStatusCodeRetryPolicy generates a RetryPolicy that retries on specified
// status codes in the HTTP response.
func NewStatusCodeRetryPolicy(codes ...int) RetryPolicy {
var retrier = &StatusCodeRetrier{codes: codes}
return func() Retrier {
return retrier
}
}
// TimeoutRetrier applies a timeout to requests and retries if the request
// took longer than the timeout duration.
type TimeoutRetrier struct {
timeout time.Duration
}
// NewTimeoutRetryPolicy generates a RetryPolicy that ends a request after a
// given timeout duration and tries again.
func NewTimeoutRetryPolicy(timeout time.Duration) RetryPolicy {
var retrier = &TimeoutRetrier{timeout: timeout}
return func() Retrier {
return retrier
}
}
// Retry the request if the context exceeded the deadline.
func (r *TimeoutRetrier) Retry(req *http.Request, resp *http.Response, e error) bool {
return e == context.DeadlineExceeded
}
// Request adds a timeout to the request context.
func (r *TimeoutRetrier) Request(req *http.Request) *http.Request {
var ctx, _ = context.WithTimeout(req.Context(), r.timeout) // nolint
return req.WithContext(ctx)
}
// FixedBackoffer signals the client to wait for a static amount of time.
type FixedBackoffer struct {
wait time.Duration
}
// NewFixedBackoffPolicy generates a BackoffPolicy that always returns the
// same value.
func NewFixedBackoffPolicy(wait time.Duration) BackoffPolicy {
var backoffer = &FixedBackoffer{wait: wait}
return func() Backoffer {
return backoffer
}
}
// Backoff for a static amount of time.
func (b *FixedBackoffer) Backoff(*http.Request, *http.Response, error) time.Duration {
return b.wait
}
// ExponentialBackoffer signals the client to wait for an initial amount of time,
// doubling every retry
type ExponentialBackoffer struct {
wait time.Duration
}
// NewExponentialBackoffPolicy generates a BackoffPolicy that returns double the value
// returned in the previous call, using the wait parameter as the initial backoff value
func NewExponentialBackoffPolicy(wait time.Duration) BackoffPolicy {
return func() Backoffer {
return &ExponentialBackoffer{wait: wait}
}
}
// Backoff for an initial amount of time, doubling each retry
func (b *ExponentialBackoffer) Backoff(*http.Request, *http.Response, error) time.Duration {
current := b.wait
b.wait = b.wait * 2
return current
}
// PercentJitteredBackoffer adjusts the backoff time by a random amount within
// N percent of the duration to help with thundering herds.
type PercentJitteredBackoffer struct {
wrapped Backoffer
jitter float64
random func() float64
}
// NewPercentJitteredBackoffPolicy wraps any backoff policy and applies a
// percentage based jitter to the original policy's value. The percentage float
// should be between 0 and 1. The jitter will be applied as a positive and
// negative value equally.
func NewPercentJitteredBackoffPolicy(wrapped BackoffPolicy, jitterPercent float64) BackoffPolicy {
return func() Backoffer {
return &PercentJitteredBackoffer{
wrapped: wrapped(),
jitter: jitterPercent,
random: rand.Float64,
}
}
}
func calculateJitteredBackoff(original time.Duration, percentage float64, random func() float64) time.Duration {
var jitterWindow = time.Duration(percentage * float64(original))
var jitter = time.Duration(random() * float64(jitterWindow))
if random() > .5 {
jitter = -jitter
}
return original + jitter
}
// Backoff for a jittered amount.
func (b *PercentJitteredBackoffer) Backoff(r *http.Request, response *http.Response, e error) time.Duration {
var d = b.wrapped.Backoff(r, response, e)
return calculateJitteredBackoff(d, b.jitter, b.random)
}
// Retry is a wrapper for applying various retry policies to requests.
type Retry struct {
wrapped http.RoundTripper
backoffPolicy BackoffPolicy
retryPolicies []RetryPolicy
}
// RoundTrip executes a request and applies one or more retry policies.
func (c *Retry) RoundTrip(r *http.Request) (*http.Response, error) {
var copier, e = newRequestCopier(r)
var parentCtx = r.Context()
if e != nil {
return nil, e
}
var response *http.Response
var requestCtx, cancel = context.WithCancel(parentCtx)
var req = copier.Copy().WithContext(requestCtx)
var retriers = make([]Retrier, 0, len(c.retryPolicies))
var backoffer = c.backoffPolicy()
for _, retryPolicy := range c.retryPolicies {
retriers = append(retriers, retryPolicy())
}
for _, retrier := range retriers {
if requester, ok := retrier.(Requester); ok {
req = requester.Request(req)
}
}
response, e = c.wrapped.RoundTrip(req)
for c.shouldRetry(r, response, e, retriers) {
select {
case <-parentCtx.Done():
cancel()
return nil, parentCtx.Err()
case <-time.After(backoffer.Backoff(r, response, e)):
}
cancel()
requestCtx, cancel = context.WithCancel(parentCtx) // nolint
var req = copier.Copy().WithContext(requestCtx)
for _, retrier := range retriers {
if requester, ok := retrier.(Requester); ok {
req = requester.Request(req)
}
}
response, e = c.wrapped.RoundTrip(req)
}
if e != nil {
cancel()
}
return response, e // nolint
}
func (c *Retry) shouldRetry(r *http.Request, response *http.Response, e error, retriers []Retrier) bool {
for _, retrier := range retriers {
if retrier.Retry(r, response, e) {
return true
}
}
return false
}
// NewRetrier configures a RoundTripper decorator to perform some number of
// retries.
func NewRetrier(backoffPolicy BackoffPolicy, retryPolicies ...RetryPolicy) func(http.RoundTripper) http.RoundTripper {
return func(wrapped http.RoundTripper) http.RoundTripper {
return &Retry{wrapped: wrapped, backoffPolicy: backoffPolicy, retryPolicies: retryPolicies}
}
}