Skip to content

Commit

Permalink
Merge pull request #553 from mreiferson/test_cleanup_553
Browse files Browse the repository at this point in the history
Tests/benchmarks should clean up after themselves
  • Loading branch information
jehiah committed May 1, 2015
2 parents a2a5938 + f5f1c91 commit 9e74e8c
Show file tree
Hide file tree
Showing 7 changed files with 142 additions and 17 deletions.
6 changes: 6 additions & 0 deletions nsqd/channel_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package nsqd

import (
"os"
"strconv"
"testing"
"time"
Expand All @@ -11,6 +12,7 @@ func TestPutMessage(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_put_message" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -31,6 +33,7 @@ func TestPutMessage2Chan(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_put_message_2chan" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -58,6 +61,7 @@ func TestInFlightWorker(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.MsgTimeout = 100 * time.Millisecond
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_in_flight_worker" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -98,6 +102,7 @@ func TestChannelEmpty(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_channel_empty" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -130,6 +135,7 @@ func TestChannelEmptyConsumer(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
tcpAddr, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

conn, _ := mustConnectNSQD(tcpAddr)
Expand Down
69 changes: 58 additions & 11 deletions nsqd/diskqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package nsqd

import (
"bufio"
"fmt"
"io/ioutil"
"os"
"path"
"strconv"
Expand All @@ -15,12 +17,17 @@ func TestDiskQueue(t *testing.T) {
l := newTestLogger(t)

dqName := "test_disk_queue" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1024, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

msg := []byte("test")
err := dq.Put(msg)
err = dq.Put(msg)
equal(t, err, nil)
equal(t, dq.Depth(), int64(1))

Expand All @@ -31,7 +38,12 @@ func TestDiskQueue(t *testing.T) {
func TestDiskQueueRoll(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_roll" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 100, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand All @@ -55,7 +67,12 @@ func assertFileNotExist(t *testing.T, fn string) {
func TestDiskQueueEmpty(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_empty" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 100, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 100, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand Down Expand Up @@ -118,7 +135,12 @@ func TestDiskQueueEmpty(t *testing.T) {
func TestDiskQueueCorruption(t *testing.T) {
l := newTestLogger(t)
dqName := "test_disk_queue_corruption" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1000, 5, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1000, 5, 2*time.Second, l)

msg := make([]byte, 123)
for i := 0; i < 25; i++ {
Expand Down Expand Up @@ -149,7 +171,12 @@ func TestDiskQueueTorture(t *testing.T) {

l := newTestLogger(t)
dqName := "test_disk_queue_torture" + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 262144, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), int64(0))

Expand Down Expand Up @@ -190,7 +217,7 @@ func TestDiskQueueTorture(t *testing.T) {

t.Logf("restarting diskqueue")

dq = newDiskQueue(dqName, os.TempDir(), 262144, 2500, 2*time.Second, l)
dq = newDiskQueue(dqName, tmpDir, 262144, 2500, 2*time.Second, l)
nequal(t, dq, nil)
equal(t, dq.Depth(), depth)

Expand Down Expand Up @@ -233,7 +260,12 @@ func BenchmarkDiskQueuePut(b *testing.B) {
b.StopTimer()
l := newTestLogger(b)
dqName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1024768*100, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024768*100, 2500, 2*time.Second, l)
size := 1024
b.SetBytes(int64(size))
data := make([]byte, size)
Expand All @@ -247,7 +279,12 @@ func BenchmarkDiskQueuePut(b *testing.B) {
func BenchmarkDiskWrite(b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
f, _ := os.OpenFile(path.Join(os.TempDir(), fileName), os.O_RDWR|os.O_CREATE, 0600)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600)
size := 256
b.SetBytes(int64(size))
data := make([]byte, size)
Expand All @@ -262,7 +299,12 @@ func BenchmarkDiskWrite(b *testing.B) {
func BenchmarkDiskWriteBuffered(b *testing.B) {
b.StopTimer()
fileName := "bench_disk_queue_put" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
f, _ := os.OpenFile(path.Join(os.TempDir(), fileName), os.O_RDWR|os.O_CREATE, 0600)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
f, _ := os.OpenFile(path.Join(tmpDir, fileName), os.O_RDWR|os.O_CREATE, 0600)
size := 256
b.SetBytes(int64(size))
data := make([]byte, size)
Expand All @@ -286,7 +328,12 @@ func BenchmarkDiskQueueGet(b *testing.B) {
b.StopTimer()
l := newTestLogger(b)
dqName := "bench_disk_queue_get" + strconv.Itoa(b.N) + strconv.Itoa(int(time.Now().Unix()))
dq := newDiskQueue(dqName, os.TempDir(), 1024768, 2500, 2*time.Second, l)
tmpDir, err := ioutil.TempDir("", fmt.Sprintf("nsq-test-%d", time.Now().UnixNano()))
if err != nil {
panic(err)
}
defer os.RemoveAll(tmpDir)
dq := newDiskQueue(dqName, tmpDir, 1024768, 2500, 2*time.Second, l)
for i := 0; i < b.N; i++ {
dq.Put([]byte("aaaaaaaaaaaaaaaaaaaaaaaaaaa"))
}
Expand Down
25 changes: 20 additions & 5 deletions nsqd/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net"
"net/http"
"os"
"runtime"
"strconv"
"sync"
Expand All @@ -21,6 +22,7 @@ func TestHTTPput(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_put" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -43,6 +45,7 @@ func TestHTTPputEmpty(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_put_empty" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -66,6 +69,7 @@ func TestHTTPmput(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_mput" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -94,6 +98,7 @@ func TestHTTPmputEmpty(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_mput_empty" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -124,6 +129,7 @@ func TestHTTPmputBinary(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_mput_bin" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -156,6 +162,7 @@ func TestHTTPSRequire(t *testing.T) {
opts.TLSKey = "./test/certs/server.key"
opts.TLSClientAuthPolicy = "require"
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_put_req" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -201,10 +208,10 @@ func TestHTTPSRequireVerify(t *testing.T) {
opts.TLSRootCAFile = "./test/certs/ca.pem"
opts.TLSClientAuthPolicy = "require-verify"
_, httpAddr, nsqd := mustStartNSQD(opts)
httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)

defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

httpsAddr := nsqd.httpsListener.Addr().(*net.TCPAddr)
topicName := "test_http_put_req_verf" + strconv.Itoa(int(time.Now().Unix()))
topic := nsqd.GetTopic(topicName)

Expand Down Expand Up @@ -266,7 +273,7 @@ func TestTLSRequireVerifyExceptHTTP(t *testing.T) {
opts.TLSClientAuthPolicy = "require-verify"
opts.TLSRequired = TLSRequiredExceptHTTP
_, httpAddr, nsqd := mustStartNSQD(opts)

defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_req_verf_except_http" + strconv.Itoa(int(time.Now().Unix()))
Expand All @@ -290,6 +297,7 @@ func TestHTTPDeprecatedTopicChannel(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_topic_channel" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -386,6 +394,7 @@ func TestHTTPTransitionTopicChannel(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

client := http.Client{}
Expand Down Expand Up @@ -507,6 +516,7 @@ func TestHTTPV1TopicChannel(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

topicName := "test_http_topic_channel2" + strconv.Itoa(int(time.Now().Unix()))
Expand Down Expand Up @@ -614,6 +624,7 @@ func BenchmarkHTTPput(b *testing.B) {
opts.Logger = newTestLogger(b)
opts.MemQueueSize = int64(b.N)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
msg := make([]byte, 256)
topicName := "bench_http_put" + strconv.Itoa(int(time.Now().Unix()))
url := fmt.Sprintf("http://%s/put?topic=%s", httpAddr, topicName)
Expand Down Expand Up @@ -653,9 +664,11 @@ func TestHTTPgetStatusJSON(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

nsqd.startTime = testTime
expectedJSON := fmt.Sprintf(`{"status_code":200,"status_txt":"OK","data":{"version":"%v","health":"OK","start_time":%v,"topics":[]}}`, version.Binary, testTime.Unix())
defer nsqd.Exit()

url := fmt.Sprintf("http://%s/stats?format=json", httpAddr)
resp, err := http.Get(url)
Expand All @@ -671,9 +684,11 @@ func TestHTTPgetStatusText(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, httpAddr, nsqd := mustStartNSQD(opts)
nsqd.startTime = testTime
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

nsqd.startTime = testTime

url := fmt.Sprintf("http://%s/stats?format=text", httpAddr)
resp, err := http.Get(url)
equal(t, err, nil)
Expand Down
7 changes: 7 additions & 0 deletions nsqd/nsqd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package nsqd
import (
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -81,6 +82,9 @@ func TestStartup(t *testing.T) {
opts.MemQueueSize = 100
opts.MaxBytesPerFile = 10240
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)

origDataPath := opts.DataPath

topicName := "nsqd_test" + strconv.Itoa(int(time.Now().Unix()))

Expand Down Expand Up @@ -147,6 +151,7 @@ func TestStartup(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
opts.MaxBytesPerFile = 10240
opts.DataPath = origDataPath
_, _, nsqd = mustStartNSQD(opts)

go func() {
Expand Down Expand Up @@ -190,6 +195,7 @@ func TestEphemeralTopicsAndChannels(t *testing.T) {
opts.Logger = newTestLogger(t)
opts.MemQueueSize = 100
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)

topicName := "ephemeral_topic" + strconv.Itoa(int(time.Now().Unix())) + "#ephemeral"
doneExitChan := make(chan int)
Expand Down Expand Up @@ -240,6 +246,7 @@ func TestPauseMetadata(t *testing.T) {
opts := NewNSQDOptions()
opts.Logger = newTestLogger(t)
_, _, nsqd := mustStartNSQD(opts)
defer os.RemoveAll(opts.DataPath)
defer nsqd.Exit()

// avoid concurrency issue of async PersistMetadata() calls
Expand Down
Loading

0 comments on commit 9e74e8c

Please sign in to comment.