Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Graphite Integration Test #1758

Merged
merged 8 commits into from
Mar 11, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions cmd/influxd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,14 @@ func Run(config *Config, join, version string, logWriter *os.File) (*messaging.B
// We want to make sure we are spun up before we exit this function, so we manually listen and serve
listener, err := net.Listen("tcp", config.BrokerAddr())
if err != nil {
log.Fatal(err)
log.Fatalf("Broker failed to listen on %s. %s ", config.BrokerAddr(), err)
}
go func() { log.Fatal(http.Serve(listener, h)) }()
go func() {
err := http.Serve(listener, h)
if err != nil {
log.Fatalf("Broker failed to server on %s.: %s", config.BrokerAddr(), err)
}
}()
log.Printf("broker listening on %s", config.BrokerAddr())

// have it occasionally tell a data node in the cluster to run continuous queries
Expand Down
104 changes: 95 additions & 9 deletions cmd/influxd/server_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"math"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -66,7 +69,7 @@ type Cluster []*Node
// the testing is marked as failed.
//
// This function returns a slice of nodes, the first of which will be the leader.
func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int) Cluster {
func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, basePort int, baseConfig *main.Config) Cluster {
t.Logf("Creating cluster of %d nodes for test %s", nNodes, testName)
if nNodes < 1 {
t.Fatalf("Test %s: asked to create nonsense cluster", testName)
Expand All @@ -85,7 +88,10 @@ func createCombinedNodeCluster(t *testing.T, testName, tmpDir string, nNodes, ba
_ = os.RemoveAll(tmpDataDir)

// Create the first node, special case.
c := main.NewConfig()
c := baseConfig
if c == nil {
c = main.NewConfig()
}
c.Broker.Dir = filepath.Join(tmpBrokerDir, strconv.Itoa(basePort))
c.Data.Dir = filepath.Join(tmpDataDir, strconv.Itoa(basePort))
c.Broker.Port = basePort
Expand Down Expand Up @@ -167,9 +173,6 @@ func write(t *testing.T, node *Node, data string) {
body, _ := ioutil.ReadAll(resp.Body)
t.Fatalf("Write to database failed. Unexpected status code. expected: %d, actual %d, %s", http.StatusOK, resp.StatusCode, string(body))
}

// Until races are solved.
time.Sleep(3 * time.Second)
}

// query executes the given query against all nodes in the cluster, and verifies no errors occured, and
Expand Down Expand Up @@ -202,6 +205,38 @@ func query(t *testing.T, nodes Cluster, urlDb, query, expected string) (string,
return "", true
}

// queryAndWait executes the given query against all nodes in the cluster, and verifies no errors occured, and
// ensures the returned data is as expected until the timeout occurs
func queryAndWait(t *testing.T, nodes Cluster, urlDb, q, expected string, timeout time.Duration) (string, bool) {
v := url.Values{"q": []string{q}}
if urlDb != "" {
v.Set("db", urlDb)
}

var (
timedOut int32
timer = time.NewTimer(time.Duration(math.MaxInt64))
)
defer timer.Stop()
if timeout > 0 {
timer.Reset(time.Duration(timeout) * time.Millisecond)
go func() {
<-timer.C
atomic.StoreInt32(&timedOut, 1)
}()
}

for {
if got, ok := query(t, nodes, urlDb, q, expected); ok {
return got, ok
} else if atomic.LoadInt32(&timedOut) == 1 {
return fmt.Sprintf("timed out before expected result was found: got: %s", got), false
} else {
time.Sleep(10 * time.Millisecond)
}
}
}

// runTests_Errors tests some basic error cases.
func runTests_Errors(t *testing.T, nodes Cluster) {
t.Logf("Running tests against %d-node cluster", len(nodes))
Expand Down Expand Up @@ -744,7 +779,7 @@ func runTestsData(t *testing.T, testName string, nodes Cluster, database, retent
if tt.queryDb != "" {
urlDb = tt.queryDb
}
got, ok := query(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention))
got, ok := queryAndWait(t, nodes, rewriteDbRp(urlDb, database, retention), rewriteDbRp(tt.query, database, retention), rewriteDbRp(tt.expected, database, retention), 3*time.Second)
if !ok {
t.Errorf("Test \"%s\" failed\n exp: %s\n got: %s\n", name, rewriteDbRp(tt.expected, database, retention), got)
}
Expand All @@ -762,7 +797,7 @@ func TestSingleServer(t *testing.T) {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090)
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8090, nil)

runTestsData(t, testName, nodes, "mydb", "myrp")
}
Expand All @@ -778,7 +813,7 @@ func Test3NodeServer(t *testing.T) {
os.RemoveAll(dir)
}()

nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190)
nodes := createCombinedNodeCluster(t, testName, dir, 3, 8190, nil)

runTestsData(t, testName, nodes, "mydb", "myrp")
}
Expand All @@ -797,7 +832,7 @@ func TestClientLibrary(t *testing.T) {
retentionPolicy := "myrp"
now := time.Now().UTC()

nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290)
nodes := createCombinedNodeCluster(t, testName, dir, 1, 8290, nil)
createDatabase(t, testName, nodes, database)
createRetentionPolicy(t, testName, nodes, database, retentionPolicy)

Expand Down Expand Up @@ -868,6 +903,56 @@ func TestClientLibrary(t *testing.T) {
}
}

func Test_ServerSingleGraphiteIntegration(t *testing.T) {
if testing.Short() {
t.Skip()
}
nNodes := 1
basePort := 8390
testName := "graphite integration"
dir := tempfile()
now := time.Now().UTC().Round(time.Millisecond)
c := main.NewConfig()
g := main.Graphite{
Enabled: true,
Database: "graphite",
Protocol: "TCP",
}
c.Graphites = append(c.Graphites, g)

t.Logf("Graphite Connection String: %s\n", g.ConnectionString(c.BindAddress))
nodes := createCombinedNodeCluster(t, testName, dir, nNodes, basePort, c)

createDatabase(t, testName, nodes, "graphite")
createRetentionPolicy(t, testName, nodes, "graphite", "raw")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does graphite know to write into raw?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's in the implementation -- and can be changed by the config params. If so, change it in the config params so that config value is tested as being effective.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test suite does this for us by creating all retention policies as DEFAULT:

https://github.com/influxdb/influxdb/blob/start-graphite/cmd/influxd/server_integration_test.go#L152

Nothing magical going on here :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

D'oh. Of course. I do see a need for a different RP for graphite, though. Another time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is one of the next PR's I'm going to work on:

#1745


// Connect to the graphite endpoint we just spun up
conn, err := net.Dial("tcp", g.ConnectionString(c.BindAddress))
if err != nil {
t.Fatal(err)
return
}

t.Log("Writing data")
data := []byte(`cpu 23.456 `)
data = append(data, []byte(fmt.Sprintf("%d", now.UnixNano()/1000000))...)
data = append(data, '\n')
_, err = conn.Write(data)
conn.Close()
if err != nil {
t.Fatal(err)
return
}

expected := fmt.Sprintf(`{"results":[{"series":[{"name":"cpu","columns":["time","cpu"],"values":[["%s",23.456]]}]}]}`, now.Format(time.RFC3339Nano))

// query and wait for results
got, ok := queryAndWait(t, nodes, "graphite", `select * from "graphite"."raw".cpu`, expected, 2*time.Second)
if !ok {
t.Errorf(`Test "%s" failed, expected: %s, got: %s`, testName, expected, got)
}
}

// helper funcs

func errToString(err error) string {
Expand All @@ -883,4 +968,5 @@ func mustMarshalJSON(v interface{}) string {
panic(e)
}
return string(b)

}