Skip to content
This repository has been archived by the owner on Aug 23, 2023. It is now read-only.

Commit

Permalink
Merge pull request #95 from raintank/issue94
Browse files Browse the repository at this point in the history
Refactor how chunks are persisted.
  • Loading branch information
Anthony Woods committed Jan 5, 2016
2 parents 34f7efc + 4577891 commit dd6f106
Show file tree
Hide file tree
Showing 7 changed files with 246 additions and 141 deletions.
132 changes: 25 additions & 107 deletions metric_tank/aggmetric.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package main

import (
"bytes"
"encoding/binary"
"fmt"
"math"
"sync"
Expand All @@ -27,9 +25,8 @@ type AggMetric struct {
ChunkSpan uint32 // span of individual chunks in seconds
Chunks []*Chunk
aggregators []*Aggregator
writeQueue chan *Chunk
activeWrite bool
firstChunkT0 uint32
ttl uint32
}

// re-order the chunks with the oldest at start of the list and newest at the end.
Expand Down Expand Up @@ -65,16 +62,17 @@ func (a *AggMetric) GrowNumChunks(numChunks uint32) {

// NewAggMetric creates a metric with given key, it retains the given number of chunks each chunkSpan seconds long
// it optionally also creates aggregations with the given settings
func NewAggMetric(key string, chunkSpan, numChunks uint32, maxDirtyChunks uint32, aggsetting ...aggSetting) *AggMetric {
func NewAggMetric(key string, chunkSpan, numChunks uint32, ttl uint32, aggsetting ...aggSetting) *AggMetric {
m := AggMetric{
Key: key,
ChunkSpan: chunkSpan,
NumChunks: numChunks,
Chunks: make([]*Chunk, 0, numChunks),
writeQueue: make(chan *Chunk, maxDirtyChunks),
Key: key,
ChunkSpan: chunkSpan,
NumChunks: numChunks,
Chunks: make([]*Chunk, 0, numChunks),
ttl: ttl,
}
for _, as := range aggsetting {
m.aggregators = append(m.aggregators, NewAggregator(key, as.span, as.chunkSpan, as.numChunks, maxDirtyChunks))
//TODO(awoods): use per aggsetting TTL
m.aggregators = append(m.aggregators, NewAggregator(key, as.span, as.chunkSpan, as.numChunks, ttl))
}

return &m
Expand Down Expand Up @@ -349,9 +347,14 @@ func (a *AggMetric) persist(pos int) {
}

// create an array of chunks that need to be sent to the writeQueue.
pending := make([]*Chunk, 1)
pending := make([]*ChunkWriteRequest, 1)
// add the current chunk to the list of chunks to send to the writeQueue
pending[0] = chunk
pending[0] = &ChunkWriteRequest{
key: a.Key,
chunk: chunk,
ttl: a.ttl,
timestamp: time.Now(),
}

// if we recently became the primary, there may be older chunks
// that the old primary did not save. We should check for those
Expand All @@ -363,20 +366,19 @@ func (a *AggMetric) persist(pos int) {
previousChunk := a.Chunks[previousPos]
for (previousChunk.T0 < chunk.T0) && !previousChunk.Saved && !previousChunk.Saving {
log.Debug("old chunk needs saving. Adding %s:%d to writeQueue", a.Key, previousChunk.T0)
pending = append(pending, previousChunk)
pending = append(pending, &ChunkWriteRequest{
key: a.Key,
chunk: previousChunk,
ttl: a.ttl,
timestamp: time.Now(),
})
previousPos--
if previousPos < 0 {
previousPos += len(a.Chunks)
}
previousChunk = a.Chunks[previousPos]
}

if len(pending) > cap(a.writeQueue) {
// this can lead to a deadlock. so lets just write what we
// can and handle the rest next time.
pending = pending[len(pending)-cap(a.writeQueue):]
}

log.Debug("sending %d chunks to write queue", len(pending))

ticker := time.NewTicker(2 * time.Second)
Expand All @@ -392,99 +394,15 @@ func (a *AggMetric) persist(pos int) {
// before newer data.
for pendingChunk >= 0 {
select {
case a.writeQueue <- pending[pendingChunk]:
pending[pendingChunk].Saving = true
case CassandraWriteQueue <- pending[pendingChunk]:
pending[pendingChunk].chunk.Saving = true
pendingChunk--
log.Debug("chunk in write queue: length: %d", len(a.writeQueue))
log.Debug("chunk in write queue: length: %d", len(CassandraWriteQueue))
case <-ticker.C:
log.Warn("%s:%d blocked pushing to writeQueue.", a.Key, chunk.T0)
}
}
ticker.Stop()

// If there is already a goroutine running that is consuming from our
// writeQueue, then we dont need to do anything further. Otherwise,
// we need to start a new goroutine to consume from the writeQueue.
// Because we still hold the lock when we check activeWrite, there is
// no risk that any existing goroutine is going to exit without first
// processing the chunk we just added to the queue.
if !a.activeWrite {
log.Debug("starting persist goroutine.")
a.activeWrite = true
// asynchronously write data to cassandra. Because we hold the lock
// when starting this goroutine, we are assured that only 1 goroutine
// is ever running and there will always be at least 1 chunk in the
// writeQueue.
go func() {
for {
select {
case c := <-a.writeQueue:
log.Debug("starting to save %s:%d %v", a.Key, c.T0, c)
data := c.Series.Bytes()
chunkSizeAtSave.Value(int64(len(data)))
version := FormatStandardGoTsz
buf := new(bytes.Buffer)
err := binary.Write(buf, binary.LittleEndian, uint8(version))
if err != nil {
// TODO
}
_, err = buf.Write(data)
if err != nil {
// TODO
}
success := false
attempts := 0
for !success {
err := InsertChunk(a.Key, c.T0, buf.Bytes(), *metricTTL)
if err == nil {
success = true
go func() {
a.Lock()
c.Saved = true
a.Unlock()
msg := &PersistMessage{
Instance: *instance,
Key: a.Key,
T0: c.T0,
}
msg.Send()
}()
log.Debug("save complete. %s:%d %v", a.Key, c.T0, c)
chunkSaveOk.Inc(1)
} else {
if (attempts % 20) == 0 {
log.Warn("failed to save chunk to cassandra after %d attempts. %v, %s", attempts+1, c, err)
}
chunkSaveFail.Inc(1)
sleepTime := 100 * attempts
if sleepTime > 2000 {
sleepTime = 2000
}
time.Sleep(time.Duration(sleepTime) * time.Millisecond)
attempts++
}
}
default:
log.Debug("waiting for lock: %s", a.Key)
a.Lock()
// this will typically be 0. Though there is a possibility for a chunk to be added
// to the writeQueue before the lock is obtained.
// If a call to aggMetric.Add() is running it will already be holding the lock. We
// will then block trying to acquire the lock until after aggMetric.Add() completes.
// If aggMetric.Add() calls persist() another chunk will be added to the writeQueue
// increasing its length to 1.
if len(a.writeQueue) == 0 {
log.Debug("no items in writeQueue, terminating write goroutine for %s.", a.Key)
a.activeWrite = false
a.Unlock()
return
}
log.Debug("still pending writes in queue for %s", a.Key)
a.Unlock()
}
}
}()
}
return
}

Expand Down
133 changes: 131 additions & 2 deletions metric_tank/aggmetric_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,13 +150,14 @@ func TestAggMetric(t *testing.T) {
func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
stats, _ := helper.New(false, "", "standard", "metrics_tank", "")
initMetrics(stats)
clusterStatus = NewClusterStatus("default", false)
// we will store 10s metrics in 5 chunks of 2 hours
// aggragate them in 5min buckets, stored in 1 chunk of 24hours
chunkSpan := uint32(2 * 3600)
numChunks := uint32(5)
chunkMaxStale := uint32(3600)
metricMaxStale := uint32(21600)
maxDirtyChunks := uint32(1)
ttl := uint32(84600)
aggSettings := []aggSetting{
{
span: uint32(300),
Expand All @@ -170,7 +171,7 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale, maxDirtyChunks, aggSettings)
metrics := NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, aggSettings)

maxT := 3600 * 24 * uint32(b.N) // b.N in days
for t := uint32(1); t < maxT; t += 10 {
Expand All @@ -180,3 +181,131 @@ func BenchmarkAggMetrics1000Metrics1Day(b *testing.B) {
}
}
}

func BenchmarkAggMetrics1kSeries2Chunks1kQueueSize(b *testing.B) {
stats, _ := helper.New(false, "", "standard", "metrics_tank", "")
initMetrics(stats)

chunkSpan := uint32(600)
numChunks := uint32(5)
chunkMaxStale := uint32(3600)
metricMaxStale := uint32(21600)

*topicNotifyPersist = ""
*cassandraWriteConcurrency = b.N
clusterStatus = NewClusterStatus("default", true)
CassandraWriteQueue = make(chan *ChunkWriteRequest, 1000)
for i := 0; i < b.N; i++ {
go processWriteQueue()
}

ttl := uint32(84600)
aggSettings := []aggSetting{
{
span: uint32(300),
chunkSpan: uint32(24 * 3600),
numChunks: uint32(2),
},
}

keys := make([]string, 1000)
for i := 0; i < 1000; i++ {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
for metricI := 0; metricI < 1000; metricI++ {
m := metrics.GetOrCreate(keys[metricI])
m.Add(t, float64(t))
}
}
}

func BenchmarkAggMetrics10kSeries2Chunks10kQueueSize(b *testing.B) {
stats, _ := helper.New(false, "", "standard", "metrics_tank", "")
initMetrics(stats)

chunkSpan := uint32(600)
numChunks := uint32(5)
chunkMaxStale := uint32(3600)
metricMaxStale := uint32(21600)

*topicNotifyPersist = ""
*cassandraWriteConcurrency = b.N

clusterStatus = NewClusterStatus("default", true)
CassandraWriteQueue = make(chan *ChunkWriteRequest, 10000)
for i := 0; i < b.N; i++ {
go processWriteQueue()
}

ttl := uint32(84600)
aggSettings := []aggSetting{
{
span: uint32(300),
chunkSpan: uint32(24 * 3600),
numChunks: uint32(2),
},
}

keys := make([]string, 10000)
for i := 0; i < 10000; i++ {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
for metricI := 0; metricI < 10000; metricI++ {
m := metrics.GetOrCreate(keys[metricI])
m.Add(t, float64(t))
}
}
}

func BenchmarkAggMetrics100kSeries2Chunks100kQueueSize(b *testing.B) {
stats, _ := helper.New(false, "", "standard", "metrics_tank", "")
initMetrics(stats)

chunkSpan := uint32(600)
numChunks := uint32(5)
chunkMaxStale := uint32(3600)
metricMaxStale := uint32(21600)

*topicNotifyPersist = ""
*cassandraWriteConcurrency = b.N

clusterStatus = NewClusterStatus("default", true)
CassandraWriteQueue = make(chan *ChunkWriteRequest, 100000)
for i := 0; i < b.N; i++ {
go processWriteQueue()
}

ttl := uint32(84600)
aggSettings := []aggSetting{
{
span: uint32(300),
chunkSpan: uint32(24 * 3600),
numChunks: uint32(2),
},
}

keys := make([]string, 100000)
for i := 0; i < 100000; i++ {
keys[i] = fmt.Sprintf("hello.this.is.a.test.key.%d", i)
}

metrics := NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale, ttl, aggSettings)

maxT := uint32(1200)
for t := uint32(1); t < maxT; t += 10 {
for metricI := 0; metricI < 100000; metricI++ {
m := metrics.GetOrCreate(keys[metricI])
m.Add(t, float64(t))
}
}
}
8 changes: 4 additions & 4 deletions metric_tank/aggmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type AggMetrics struct {
aggSettings []aggSetting // for now we apply the same settings to all AggMetrics. later we may want to have different settings.
chunkMaxStale uint32
metricMaxStale uint32
maxDirtyChunks uint32
ttl uint32
}

var totalPoints chan int
Expand All @@ -25,15 +25,15 @@ func init() {
totalPoints = make(chan int, 1000)
}

func NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, maxDirtyChunks uint32, aggSettings []aggSetting) *AggMetrics {
func NewAggMetrics(chunkSpan, numChunks, chunkMaxStale, metricMaxStale uint32, ttl uint32, aggSettings []aggSetting) *AggMetrics {
ms := AggMetrics{
Metrics: make(map[string]*AggMetric),
chunkSpan: chunkSpan,
numChunks: numChunks,
aggSettings: aggSettings,
chunkMaxStale: chunkMaxStale,
metricMaxStale: metricMaxStale,
maxDirtyChunks: maxDirtyChunks,
ttl: ttl,
}

go ms.stats()
Expand Down Expand Up @@ -94,7 +94,7 @@ func (ms *AggMetrics) GetOrCreate(key string) Metric {
ms.Lock()
m, ok := ms.Metrics[key]
if !ok {
m = NewAggMetric(key, ms.chunkSpan, ms.numChunks, ms.maxDirtyChunks, ms.aggSettings...)
m = NewAggMetric(key, ms.chunkSpan, ms.numChunks, ms.ttl, ms.aggSettings...)
ms.Metrics[key] = m
}
ms.Unlock()
Expand Down
Loading

0 comments on commit dd6f106

Please sign in to comment.