-
Notifications
You must be signed in to change notification settings - Fork 12
/
async.go
148 lines (126 loc) · 3.73 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package main
import (
"errors"
"log"
"net/http"
"os"
"os/signal"
"sync"
"time"
)
type asyncResponse struct {
Response
MayBeRetried bool
}
type asyncErrorResponse struct {
ErrorResponse
MayBeRetried bool
}
// MaybeSyncResponse can be returned from operations which may or may not
// complete synchronously. When OperationFinishedSynchronously is true, then
// Response will be specified.
type MaybeSyncResponse struct {
Response
OperationFinishedSynchronously bool
}
func syncResponse(response Response) MaybeSyncResponse {
return MaybeSyncResponse{
Response: response,
OperationFinishedSynchronously: true,
}
}
func delayWithRetries(tryDelays []time.Duration, operation func() asyncResponse,
asyncOperationWg *sync.WaitGroup) MaybeSyncResponse {
if len(tryDelays) < 1 {
return syncResponse(ErrorResponse{
Code: http.StatusInternalServerError,
ErrorMessage: "Cannot schedule any delayed operations when tryDelays is empty",
})
}
if tryDelays[0] == 0 {
response := operation()
if len(tryDelays) > 1 && response.MayBeRetried {
log.Println("Operation will be retried")
if err := asyncDelayWithRetries(tryDelays[1:], operation, asyncOperationWg); err != nil {
return syncResponse(
ErrorResponse{err, http.StatusInternalServerError, "Failed to schedule async retries"},
)
}
return MaybeSyncResponse{OperationFinishedSynchronously: false}
}
return syncResponse(response)
}
if err := asyncDelayWithRetries(tryDelays, operation, asyncOperationWg); err != nil {
return syncResponse(
ErrorResponse{err, http.StatusInternalServerError, "Failed to schedule async delay with retries"},
)
}
return MaybeSyncResponse{OperationFinishedSynchronously: false}
}
func asyncDelayWithRetries(tryDelays []time.Duration, operation func() asyncResponse,
asyncOperationWg *sync.WaitGroup) error {
if len(tryDelays) < 1 {
return errors.New("Cannot schedule any delayed operations when tryDelays is empty")
}
delay(tryDelays[0], func() {
response := operation()
handleAsyncResponse(response.Response)
if len(tryDelays) > 1 && response.MayBeRetried {
log.Println("Operation will be retried")
if err := asyncDelayWithRetries(tryDelays[1:], operation, asyncOperationWg); err != nil {
log.Printf("Failed to schedule another try to start in %s\n", tryDelays[1].String())
return
}
}
}, asyncOperationWg)
log.Printf("Scheduled an asynchronous operation to start in %s\n", tryDelays[0].String())
return nil
}
func delay(duration time.Duration, operation func(), asyncOperationWg *sync.WaitGroup) {
interruptChan := make(chan os.Signal, 1)
signal.Notify(interruptChan, os.Interrupt)
timer := time.NewTimer(duration)
asyncOperationWg.Add(1)
go func() {
defer asyncOperationWg.Done()
// Avoid leaking channels
defer signal.Stop(interruptChan)
// Block until either of the 2 channels receives.
select {
case <-interruptChan:
log.Println("Received an interrupt signal (SIGINT). Starting a scheduled process immediately.")
case <-timer.C:
}
operation()
}()
}
func retriable(response Response) asyncResponse {
return asyncResponse{
Response: response,
MayBeRetried: true,
}
}
func nonRetriable(response Response) asyncResponse {
return asyncResponse{
Response: response,
MayBeRetried: false,
}
}
func retriableError(errResp ErrorResponse) *asyncErrorResponse {
return &asyncErrorResponse{
ErrorResponse: errResp,
MayBeRetried: true,
}
}
func nonRetriableError(errResp ErrorResponse) *asyncErrorResponse {
return &asyncErrorResponse{
ErrorResponse: errResp,
MayBeRetried: false,
}
}
func (a asyncErrorResponse) toAsyncResponse() asyncResponse {
return asyncResponse{
Response: a.ErrorResponse,
MayBeRetried: a.MayBeRetried,
}
}