forked from blaines/tasque-go
-
Notifications
You must be signed in to change notification settings - Fork 1
/
sfn_handler.go
126 lines (106 loc) · 3.16 KB
/
sfn_handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package main
import (
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sfn"
)
// SFNHandler hello world
type SFNHandler struct {
client sfn.SFN
messageBody string
taskToken string
activityARN string
awsRegion string
}
func (handler *SFNHandler) id() *string {
// There's no real use for the full token
token := handler.taskToken[0:32]
return &token
}
func (handler *SFNHandler) body() *string {
return &handler.messageBody
}
func (handler *SFNHandler) initialize() {
log.Printf("Configuring handler. activityARN:%s", handler.activityARN)
sess, err := session.NewSession(&aws.Config{Region: aws.String(strings.Split(handler.activityARN, ":")[3])})
if err != nil {
fmt.Println("failed to create session,", err)
panic("failed to create session")
}
// client := sfn.New(sess, &aws.Config{
// MaxRetries: aws.Int(30),
// HTTPClient: &http.Client{
// Timeout: 30 * time.Second,
// },
// })
client := sfn.New(sess)
handler.newClient(*client)
}
func (handler *SFNHandler) newClient(client sfn.SFN) {
handler.client = client
}
func (handler *SFNHandler) receive() bool {
for {
log.Printf("Waiting for SFN activity data from %s", handler.activityARN)
hostname, _ := os.Hostname()
getActivityTaskParams := &sfn.GetActivityTaskInput{
ActivityArn: aws.String(handler.activityARN),
WorkerName: aws.String(hostname),
}
receiveMessageResponse, receiveMessageError := handler.client.GetActivityTask(getActivityTaskParams)
if receiveMessageError != nil {
// Print the error, cast err to awserr.Error to get the Code and
// Message from an error.
log.Fatal("E: ", receiveMessageError.Error())
return false
}
if receiveMessageResponse.TaskToken != nil {
handler.messageBody = *receiveMessageResponse.Input
handler.taskToken = *receiveMessageResponse.TaskToken
writeFileError := ioutil.WriteFile("payload.json", []byte(handler.messageBody), 0644)
if writeFileError != nil {
panic(writeFileError)
}
return true
}
}
}
func (handler *SFNHandler) success(result *string) {
sendTaskSuccessParams := &sfn.SendTaskSuccessInput{
Output: aws.String(handler.messageBody),
TaskToken: aws.String(handler.taskToken),
}
if result != nil && *result != "" {
sendTaskSuccessParams.Output = aws.String(*result)
}
_, deleteMessageError := handler.client.SendTaskSuccess(sendTaskSuccessParams)
if deleteMessageError != nil {
return
}
}
func (handler *SFNHandler) failure(err Result) {
sendTaskFailureParams := &sfn.SendTaskFailureInput{
TaskToken: aws.String(handler.taskToken),
Error: aws.String(err.Error),
Cause: aws.String(err.Message()),
}
_, deleteMessageError := handler.client.SendTaskFailure(sendTaskFailureParams)
if deleteMessageError != nil {
log.Printf("Couldn't send task failure %+v", deleteMessageError)
return
}
}
func (handler *SFNHandler) heartbeat() {
sendTaskHeartbeatParams := &sfn.SendTaskHeartbeatInput{
TaskToken: aws.String(handler.taskToken),
}
_, deleteMessageError := handler.client.SendTaskHeartbeat(sendTaskHeartbeatParams)
if deleteMessageError != nil {
return
}
}