-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Graphite Integration Test #1758
Changes from 6 commits
e3d74f6
019b9de
c7068b7
174a300
5f8b176
98484b6
3cce599
c46caf8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,12 +5,15 @@ import ( | |
"encoding/json" | ||
"fmt" | ||
"io/ioutil" | ||
"math" | ||
"net" | ||
"net/http" | ||
"net/url" | ||
"os" | ||
"path/filepath" | ||
"strconv" | ||
"strings" | ||
"sync/atomic" | ||
"testing" | ||
"time" | ||
|
||
|
@@ -66,7 +69,7 @@ type Cluster []*Node | |
// the testing is marked as failed. | ||
// | ||
// This function returns a slice of nodes, the first of which will be the leader. | ||
func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int) Cluster { | ||
func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int, baseConfig *main.Config) Cluster { | ||
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName) | ||
if nNodes < 1 { | ||
t.Fatalf("Test %s: asked to create nonsense cluster", testName) | ||
|
@@ -85,7 +88,10 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba | |
_ = os.RemoveAll(tmpDataDir) | ||
|
||
// Create the first node, special case. | ||
c := main.NewConfig() | ||
c := baseConfig | ||
if c == nil { | ||
c = main.NewConfig() | ||
} | ||
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort)) | ||
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort)) | ||
c.Broker.Port = basePort | ||
|
@@ -167,9 +173,6 @@ func write(t *testing.T, node *Node, data string) { | |
body, _ := ioutil.ReadAll(resp.Body) | ||
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body)) | ||
} | ||
|
||
// Until races are solved. | ||
time.Sleep(3 * time.Second) | ||
} | ||
|
||
// query executes the given query against all nodes in the cluster, and verifies no errors occured, and | ||
|
@@ -202,6 +205,38 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected string) (string, | |
return "", true | ||
} | ||
|
||
// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and | ||
// ensures the returned data is as expected until the timeout occurs | ||
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeout time.Duration) (string, bool) { | ||
v := url.Values{"q": []string{q}} | ||
if urlDb != "" { | ||
v.Set("db", urlDb) | ||
} | ||
|
||
var ( | ||
timedOut int32 | ||
timer = time.NewTimer(time.Duration(math.MaxInt64)) | ||
) | ||
defer timer.Stop() | ||
if timeout > 0 { | ||
timer.Reset(time.Duration(timeout) * time.Millisecond) | ||
go func() { | ||
<-timer.C | ||
atomic.StoreInt32(&timedOut, 1) | ||
}() | ||
} | ||
|
||
for { | ||
if got, ok := query(t, nodes, urlDb, q, expected); ok { | ||
return got, ok | ||
} else if atomic.LoadInt32(&timedOut) == 1 { | ||
return fmt.Sprintf("timed out before expected result was found: got: %s", got), false | ||
} else { | ||
time.Sleep(100 * time.Millisecond) | ||
} | ||
} | ||
} | ||
|
||
// runTests_Errors tests some basic error cases. | ||
func runTests_Errors(t *testing.T, nodes Cluster) { | ||
t.Logf("Running tests against %d-node cluster", len(nodes)) | ||
|
@@ -744,7 +779,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent | |
if tt.queryDb != "" { | ||
urlDb = tt.queryDb | ||
} | ||
got, ok := query(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention)) | ||
got, ok := queryAndWait(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention), 3*time.Second) | ||
if !ok { | ||
t.Errorf("Test \"%s\" failed\n exp: %s\n got: %s\n", name, rewriteDbRp(tt.expected, database, retention), got) | ||
} | ||
|
@@ -762,7 +797,7 @@ func TestSingleServer(t *testing.T) { | |
os.RemoveAll(dir) | ||
}() | ||
|
||
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090) | ||
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil) | ||
|
||
runTestsData(t, testName, nodes, "mydb", "myrp") | ||
} | ||
|
@@ -778,7 +813,7 @@ func Test3NodeServer(t *testing.T) { | |
os.RemoveAll(dir) | ||
}() | ||
|
||
nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190) | ||
nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190, nil) | ||
|
||
runTestsData(t, testName, nodes, "mydb", "myrp") | ||
} | ||
|
@@ -797,7 +832,7 @@ func TestClientLibrary(t *testing.T) { | |
retentionPolicy := "myrp" | ||
now := time.Now().UTC() | ||
|
||
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290) | ||
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290, nil) | ||
createDatabase(t, testName, nodes, database) | ||
createRetentionPolicy(t, testName, nodes, database, retentionPolicy) | ||
|
||
|
@@ -868,6 +903,56 @@ func TestClientLibrary(t *testing.T) { | |
} | ||
} | ||
|
||
func Test_ServerSingleGraphiteIntegration(t *testing.T) { | ||
if testing.Short() { | ||
t.Skip() | ||
} | ||
nNodes := 1 | ||
basePort := 8390 | ||
testName := "graphite integration" | ||
dir := tempfile() | ||
now := time.Now().UTC().Round(time.Millisecond) | ||
c := main.NewConfig() | ||
g := main.Graphite{ | ||
Enabled: true, | ||
Database: "graphite", | ||
Protocol: "TCP", | ||
} | ||
c.Graphites = append(c.Graphites, g) | ||
|
||
t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress)) | ||
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c) | ||
|
||
createDatabase(t, testName, nodes, "graphite") | ||
createRetentionPolicy(t, testName, nodes, "graphite", "raw") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does graphite know to write into There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess it's in the implementation -- and can be changed by the config params. If so, change it in the config params so that config value is tested as being effective. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The test suite does this for us by creating all retention policies as https://github.com/influxdb/influxdb/blob/start-graphite/cmd/influxd/server_integration_test.go#L152 Nothing magical going on here :-) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. D'oh. Of course. I do see a need for a different RP for graphite, though. Another time. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That is one of the next PR's I'm going to work on: |
||
|
||
// Connect to the graphite endpoint we just spun up | ||
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress)) | ||
if err != nil { | ||
t.Fatal(err) | ||
return | ||
} | ||
|
||
t.Log("Writing data") | ||
data := []byte(`cpu 23.456 `) | ||
data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...) | ||
data = append(data, '\n') | ||
_, err = conn.Write(data) | ||
conn.Close() | ||
if err != nil { | ||
t.Fatal(err) | ||
return | ||
} | ||
|
||
expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano)) | ||
|
||
// query and wait for results | ||
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second) | ||
if !ok { | ||
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got) | ||
} | ||
} | ||
|
||
// helper funcs | ||
|
||
func errToString(err error) string { | ||
|
@@ -883,4 +968,5 @@ func mustMarshalJSON(v interface{}) string { | |
panic(e) | ||
} | ||
return string(b) | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see no reason why this can't be 10 milliseconds.