-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathinstance.go
143 lines (125 loc) · 3.49 KB
/
instance.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
package gomaxcompute
import (
"encoding/base64"
"encoding/xml"
"fmt"
"strings"
"github.com/pkg/errors"
)
type instanceStatus struct {
XMLName xml.Name `xml:"Instance"`
Status string `xml:"Status"`
}
type Result struct {
Content string `xml:",cdata"`
Transform string `xml:"Transform,attr"`
Format string `xml:"Format,attr"`
}
type instanceResult struct {
XMLName xml.Name `xml:"Instance"`
Result Result `xml:"Tasks>Task>Result"`
}
type instanceErrorMessage struct {
CDATA string `xml:",cdata"`
}
type instanceError struct {
XMLName xml.Name `xml:"Error"`
Code string `xml:"Code"`
Message instanceErrorMessage `xml:"Message"`
RequestId string `xml:"RequestId"`
HostId string `xml:"HostId"`
}
// instance types:SQL
func (conn *odpsConn) createInstance(job *odpsJob) (string, error) {
if job == nil {
return "", errors.New("nil job")
}
// Create
res, err := conn.request(methodPost, conn.resource("/instances"), job.XML())
if err != nil {
return "", err
}
if _, err = parseResponse(res); err != nil && err != errNilBody {
return "", err
}
// Parse response header "Location" to get instance ID
ins := location2InstanceID(res.Header.Get("Location"))
if ins == "" {
return "", errors.New("no instance id")
}
return ins, nil
}
// parse instance id
func location2InstanceID(location string) string {
pieces := strings.Split(location, "/")
if len(pieces) < 2 {
return ""
}
return pieces[len(pieces)-1]
}
// instance status:Running/Suspended/Terminated
func (conn *odpsConn) getInstanceStatus(instanceID string) (string, error) {
res, err := conn.request(methodGet, conn.resource("/instances/"+instanceID), nil)
if err != nil {
return "", err
}
body, err := parseResponse(res)
if err != nil {
return "", err
}
var is instanceStatus
err = xml.Unmarshal(body, &is)
if err != nil {
return "", err
}
return is.Status, nil
}
// getInstanceResult is valid while instance status is `Terminated`
// notice: records up to 10000 by limitation, and result type is string
func (conn *odpsConn) getInstanceResult(instanceID string) (string, error) {
rsc := conn.resource("/instances/"+instanceID, pair{k: "result"})
rsp, err := conn.request(methodGet, rsc, nil)
if err != nil {
return "", err
}
body, err := parseResponse(rsp)
if err != nil {
return "", err
}
return decodeInstanceResult(body)
}
func decodeInstanceResult(result []byte) (string, error) {
var ir instanceResult
if err := xml.Unmarshal(result, &ir); err != nil {
return "", err
}
if ir.Result.Format == "text" {
log.Debug(ir.Result.Content)
// ODPS errors are text begin with "ODPS-"
if strings.HasPrefix(ir.Result.Content, "ODPS-") {
code, err := parseErrorCode(ir.Result.Content)
if err != nil {
return "", errors.WithStack(errors.New(ir.Result.Content))
}
return "", &MaxcomputeError{code, ir.Result.Content}
}
// FIXME(tony): the result non-query statement usually in text format.
// Go's database/sql API only supports lastId and affectedRows.
return "", nil
}
if ir.Result.Format != "csv" {
return "", errors.WithStack(fmt.Errorf("unsupported format %v", ir.Result.Format))
}
switch ir.Result.Transform {
case "":
return ir.Result.Content, nil
case "Base64":
content, err := base64.StdEncoding.DecodeString(ir.Result.Content)
if err != nil {
return "", err
}
return string(content), err
default:
return "", errors.WithStack(fmt.Errorf("unsupported transform %v", ir.Result.Transform))
}
}