forked from influxdata/influxdb
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtx_test.go
93 lines (78 loc) · 3.78 KB
/
tx_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package influxdb_test
import (
"reflect"
"sort"
"testing"
"github.com/influxdb/influxdb"
"github.com/influxdb/influxdb/influxql"
)
// Ensure a transaction can retrieve a list of iterators for a simple SELECT statement.
func TestTx_CreateIterators(t *testing.T) {
t.Skip()
s := OpenDefaultServer(NewMessagingClient())
defer s.Close()
// Write to us-east
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA", "service": "redis"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(100)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverB", "service": "redis"}, Timestamp: mustParseTime("2000-01-01T00:00:10Z"), Fields: map[string]interface{}{"value": float64(90)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverC"}, Timestamp: mustParseTime("2000-01-01T00:00:20Z"), Fields: map[string]interface{}{"value": float64(80)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-east", "host": "serverA", "service": "redis"}, Timestamp: mustParseTime("2000-01-01T00:00:30Z"), Fields: map[string]interface{}{"value": float64(70)}}})
// Write to us-west
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west", "host": "serverD"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(1)}}})
s.MustWriteSeries("db", "raw", []influxdb.Point{{Name: "cpu", Tags: map[string]string{"region": "us-west", "host": "serverE", "service": "redis"}, Timestamp: mustParseTime("2000-01-01T00:00:00Z"), Fields: map[string]interface{}{"value": float64(2)}}})
// Create a statement to iterate over.
// stmt := MustParseSelectStatement(`
// SELECT value
// FROM "db"."raw"."cpu"
// WHERE (((service = 'redis' AND region = 'us-west' AND value > 20) AND host = 'serverE') OR service='redis') AND (time >= '2000-01-01' AND time < '2000-01-02')
// GROUP BY time(1h), region`)
stmt := MustParseSelectStatement(`
SELECT value
FROM "db"."raw"."cpu"
WHERE (service = 'redis' AND value < 100) AND (time >= '2000-01-01' AND time < '2000-01-02')
GROUP BY time(1h), region`)
// Retrieve iterators from server.
tx, err := s.Begin()
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
tx.SetNow(mustParseTime("2000-01-01T00:00:00Z"))
// Retrieve iterators from transaction.
itrs, err := tx.CreateIterators(stmt)
if err != nil {
t.Fatalf("unexpected error: %#v", err)
} else if n := len(itrs); n != 2 {
t.Fatalf("iterator count: %d", n)
}
// Open transaction.
if err := tx.Open(); err != nil {
t.Fatalf("tx open error: %s", err)
}
defer tx.Close()
// Iterate over each one.
if data := slurp(itrs); !reflect.DeepEqual(data, []keyValue{
{key: 946684800000000000, value: float64(2), tags: "\x00\aus-west"},
{key: 946684810000000000, value: float64(90), tags: "\x00\aus-east"},
{key: 946684830000000000, value: float64(70), tags: "\x00\aus-east"},
}) {
t.Fatalf("unexpected data: %#v", data)
}
}
func slurp(itrs []influxql.Iterator) []keyValue {
var rows []keyValue
for _, itr := range itrs {
for k, _, v := itr.Next(); k != 0; k, _, v = itr.Next() {
rows = append(rows, keyValue{key: k, value: v, tags: itr.Tags()})
}
}
sort.Sort(keyValueSlice(rows))
return rows
}
type keyValue struct {
key int64
value interface{}
tags string
}
type keyValueSlice []keyValue
func (p keyValueSlice) Len() int { return len(p) }
func (p keyValueSlice) Less(i, j int) bool { return p[i].key < p[j].key || p[i].tags < p[j].tags }
func (p keyValueSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }