Skip to content

Commit

Permalink
Merge pull request #9456 from influxdata/sgc-logging
Browse files Browse the repository at this point in the history
Generate trace logs for a number of important InfluxDB operations
  • Loading branch information
stuartcarnie authored Feb 21, 2018
2 parents 448a114 + d135aec commit d40d3ec
Show file tree
Hide file tree
Showing 20 changed files with 453 additions and 116 deletions.
4 changes: 2 additions & 2 deletions cmd/influx_inspect/buildtsi/buildtsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error {
}

func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpName, dataDir, walDir string) error {
cmd.Logger.Info("rebuilding retention policy", zap.String("db", dbName), zap.String("rp", rpName))
cmd.Logger.Info("rebuilding retention policy", logger.Database(dbName), logger.RetentionPolicy(rpName))

fis, err := ioutil.ReadDir(dataDir)
if err != nil {
Expand All @@ -142,7 +142,7 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
}

func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string, shardID uint64, dataDir, walDir string) error {
cmd.Logger.Info("rebuilding shard", zap.String("db", dbName), zap.String("rp", rpName), zap.Uint64("shard", shardID))
cmd.Logger.Info("rebuilding shard", logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(shardID))

// Check if shard already has a TSI index.
indexPath := filepath.Join(dataDir, "index")
Expand Down
111 changes: 111 additions & 0 deletions logger/fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package logger

import (
"time"

"github.com/influxdata/influxdb/pkg/snowflake"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
// TraceIDKey is the logging context key used for identifying unique traces.
TraceIDKey = "trace_id"

// OperationNameKey is the logging context key used for identifying name of an operation.
OperationNameKey = "op.name"

// OperationEventKey is the logging context key used for identifying a notable
// event during the course of an operation.
OperationEventKey = "op.event"

// OperationElapsedKey is the logging context key used for identifying time elapsed to finish an operation.
OperationElapsedKey = "op.elapsed"

// DBInstanceKey is the logging context key used for identifying name of the relevant database.
DBInstanceKey = "db.instance"

// DBRetentionKey is the logging context key used for identifying name of the relevant retention policy.
DBRetentionKey = "db.rp"

// DBShardGroupKey is the logging context key used for identifying relevant shard group.
DBShardGroupKey = "db.shard_group"

// DBShardIDKey is the logging context key used for identifying name of the relevant shard group.
DBShardIDKey = "db.shard_id"
)
const (
eventStart = "start"
eventEnd = "end"
)

var (
gen = snowflake.New(0)
)

func nextID() string {
return gen.NextString()
}

// TraceID returns a field for tracking the trace identifier.
func TraceID(id string) zapcore.Field {
return zap.String(TraceIDKey, id)
}

// OperationName returns a field for tracking the name of an operation.
func OperationName(name string) zapcore.Field {
return zap.String(OperationNameKey, name)
}

// OperationElapsed returns a field for tracking the duration of an operation.
func OperationElapsed(d time.Duration) zapcore.Field {
return zap.Duration(OperationElapsedKey, d)
}

// OperationEventStart returns a field for tracking the start of an operation.
func OperationEventStart() zapcore.Field {
return zap.String(OperationEventKey, eventStart)
}

// OperationEventFinish returns a field for tracking the end of an operation.
func OperationEventEnd() zapcore.Field {
return zap.String(OperationEventKey, eventEnd)
}

// Database returns a field for tracking the name of a database.
func Database(name string) zapcore.Field {
return zap.String(DBInstanceKey, name)
}

// Database returns a field for tracking the name of a database.
func RetentionPolicy(name string) zapcore.Field {
return zap.String(DBRetentionKey, name)
}

// ShardGroup returns a field for tracking the shard group identifier.
func ShardGroup(id uint64) zapcore.Field {
return zap.Uint64(DBShardGroupKey, id)
}

// Shard returns a field for tracking the shard identifier.
func Shard(id uint64) zapcore.Field {
return zap.Uint64(DBShardIDKey, id)
}

// NewOperation uses the exiting log to create a new logger with context
// containing a trace id and the operation. Prior to returning, a standardized message
// is logged indicating the operation has started. The returned function should be
// called when the operation concludes in order to log a corresponding message which
// includes an elapsed time and that the operation has ended.
func NewOperation(log *zap.Logger, msg, name string, fields ...zapcore.Field) (*zap.Logger, func()) {
f := []zapcore.Field{TraceID(nextID()), OperationName(name)}
if len(fields) > 0 {
f = append(f, fields...)
}

now := time.Now()
log = log.With(f...)
log.Info(msg+" (start)", OperationEventStart())

return log, func() { log.Info(msg+" (end)", OperationEventEnd(), OperationElapsed(time.Since(now))) }
}
2 changes: 1 addition & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
encoder,
zapcore.Lock(zapcore.AddSync(w)),
c.Level,
)), nil
), zap.Fields(zap.String("log_id", nextID()))), nil
}

func newEncoder(format string) (zapcore.Encoder, error) {
Expand Down
4 changes: 2 additions & 2 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (m *Monitor) createInternalStorage() {
}

if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil {
m.Logger.Info("Failed to create storage", zap.String("db", m.storeDatabase), zap.Error(err))
m.Logger.Info("Failed to create storage", logger.Database(m.storeDatabase), zap.Error(err))
return
}
}
Expand All @@ -417,7 +417,7 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error {
// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Info("Storing statistics", zap.String("db", m.storeDatabase), zap.String("rp", m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))
m.Logger.Info("Storing statistics", logger.Database(m.storeDatabase), logger.RetentionPolicy(m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))

// Wait until an even interval to start recording monitor statistics.
// If we are interrupted before the interval for some reason, exit early.
Expand Down
38 changes: 38 additions & 0 deletions pkg/snowflake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Snowflake ID generator
======================

This is a Go implementation of [Twitter Snowflake](https://blog.twitter.com/2010/announcing-snowflake).

The most useful aspect of these IDs is they are _roughly_ sortable and when generated
at roughly the same time, should have values in close proximity to each other.

IDs
---

Each id will be a 64-bit number represented, structured as follows:


```
6 6 5 4 3 2 1
3210987654321098765432109876543210987654321098765432109876543210
ttttttttttttttttttttttttttttttttttttttttttmmmmmmmmmmssssssssssss
```

where

* s (sequence) is a 12-bit integer that increments if called multiple times for the same millisecond
* m (machine id) is a 10-bit integer representing the server id
* t (time) is a 42-bit integer representing the current timestamp in milliseconds
the number of milliseconds to have elapsed since 1491696000000 or 2017-04-09T00:00:00Z

### String Encoding

The 64-bit unsigned integer is base-63 encoded using the following URL-safe characters, which are ordered
according to their ASCII value.

```
0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~
```

A binary sort of a list of encoded values will be correctly ordered according to the numerical representation.
107 changes: 107 additions & 0 deletions pkg/snowflake/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package snowflake

import (
"fmt"
"sync"
"time"
)

const (
epoch = 1491696000000
serverBits = 10
sequenceBits = 12
serverShift = sequenceBits
timeShift = sequenceBits + serverBits
serverMax = ^(-1 << serverBits)
sequenceMask = ^(-1 << sequenceBits)
)

type Generator struct {
rw sync.Mutex
lastTimestamp uint64
machineID int
sequence int32
}

func New(machineID int) *Generator {
if machineID < 0 || machineID > serverMax {
panic(fmt.Errorf("invalid machine id; must be 0 ≤ id < %d", serverMax))
}
return &Generator{
machineID: machineID,
lastTimestamp: 0,
sequence: 0,
}
}

func (g *Generator) MachineID() int {
return g.machineID
}

func (g *Generator) Next() uint64 {
t := now()
g.rw.Lock()
if t == g.lastTimestamp {
g.sequence = (g.sequence + 1) & sequenceMask
if g.sequence == 0 {
t = g.nextMillis()
}
} else if t < g.lastTimestamp {
t = g.nextMillis()
} else {
g.sequence = 0
}
g.lastTimestamp = t
seq := g.sequence
g.rw.Unlock()

tp := (t - epoch) << timeShift
sp := uint64(g.machineID << serverShift)
n := tp | sp | uint64(seq)

return n
}

func (g *Generator) NextString() string {
var s [11]byte
encode(&s, g.Next())
return string(s[:])
}

func (g *Generator) AppendNext(s *[11]byte) {
encode(s, g.Next())
}

func (g *Generator) nextMillis() uint64 {
t := now()
for t <= g.lastTimestamp {
time.Sleep(100 * time.Microsecond)
t = now()
}
return t
}

func now() uint64 { return uint64(time.Now().UnixNano() / 1e6) }

var digits = [...]byte{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z', '_', 'a', 'b', 'c',
'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z', '~'}

func encode(s *[11]byte, n uint64) {
s[10], n = digits[n&0x3f], n>>6
s[9], n = digits[n&0x3f], n>>6
s[8], n = digits[n&0x3f], n>>6
s[7], n = digits[n&0x3f], n>>6
s[6], n = digits[n&0x3f], n>>6
s[5], n = digits[n&0x3f], n>>6
s[4], n = digits[n&0x3f], n>>6
s[3], n = digits[n&0x3f], n>>6
s[2], n = digits[n&0x3f], n>>6
s[1], n = digits[n&0x3f], n>>6
s[0] = digits[n&0x3f]
}
68 changes: 68 additions & 0 deletions pkg/snowflake/gen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package snowflake

import (
"fmt"
"math/rand"
"sort"
"testing"

"github.com/influxdata/influxdb/pkg/testing/assert"
)

func TestEncode(t *testing.T) {
tests := []struct {
v uint64
exp string
}{
{0x000, "00000000000"},
{0x001, "00000000001"},
{0x03f, "0000000000~"},
{0x07f, "0000000001~"},
{0xf07f07f07f07f07f, "F1~1~1~1~1~"},
}
for _, test := range tests {
t.Run(fmt.Sprintf("0x%03x→%s", test.v, test.exp), func(t *testing.T) {
var s [11]byte
encode(&s, test.v)
assert.Equal(t, string(s[:]), test.exp)
})
}
}

// TestSorting verifies numbers using base 63 encoding are ordered according to their numerical representation.
func TestSorting(t *testing.T) {
var (
vals = make([]string, 1000)
exp = make([]string, 1000)
)

for i := 0; i < len(vals); i++ {
var s [11]byte
encode(&s, uint64(i*47))
vals[i] = string(s[:])
exp[i] = string(s[:])
}

// randomize them
shuffle(len(vals), func(i, j int) {
vals[i], vals[j] = vals[j], vals[i]
})

sort.Strings(vals)
assert.Equal(t, vals, exp)
}

func BenchmarkEncode(b *testing.B) {
b.ReportAllocs()
var s [11]byte
for i := 0; i < b.N; i++ {
encode(&s, 100)
}
}

func shuffle(n int, swap func(i, j int)) {
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
swap(i, j)
}
}
5 changes: 3 additions & 2 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"collectd.org/api"
"collectd.org/network"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (s *Service) writePoints() {
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info("Required database not yet created",
zap.String("db", s.Config.Database), zap.Error(err))
logger.Database(s.Config.Database), zap.Error(err))
continue
}

Expand All @@ -404,7 +405,7 @@ func (s *Service) writePoints() {
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.Config.Database), zap.Error(err))
logger.Database(s.Config.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}
Expand Down
Loading

0 comments on commit d40d3ec

Please sign in to comment.