Skip to content

Commit

Permalink
Merge pull request #7496 from influxdata/js-filter-shards-without-ser…
Browse files Browse the repository at this point in the history
…ies-key

Filter out series within shards that do not have data for that series
  • Loading branch information
jsternberg authored Oct 21, 2016
2 parents 6fd74a6 + 3681bc8 commit 332de12
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 19 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
- [#7463](https://github.com/influxdata/influxdb/pull/7463): Make input plugin services open/close idempotent.
- [#7473](https://github.com/influxdata/influxdb/pull/7473): Align binary math expression streams by time.
- [#7281](https://github.com/influxdata/influxdb/pull/7281): Add stats for active compactions, compaction errors.
- [#7496](https://github.com/influxdata/influxdb/pull/7496): Filter out series within shards that do not have data for that series.

### Bugfixes

Expand Down
9 changes: 5 additions & 4 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ const (
)

// NewEngineFunc creates a new engine.
type NewEngineFunc func(path string, walPath string, options EngineOptions) Engine
type NewEngineFunc func(id uint64, path string, walPath string, options EngineOptions) Engine

// newEngineFuncs is a lookup of engine constructors by name.
var newEngineFuncs = make(map[string]NewEngineFunc)
Expand All @@ -87,10 +87,10 @@ func RegisteredEngines() []string {

// NewEngine returns an instance of an engine based on its format.
// If the path does not exist then the DefaultFormat is used.
func NewEngine(path string, walPath string, options EngineOptions) (Engine, error) {
func NewEngine(id uint64, path string, walPath string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[options.EngineVersion](path, walPath, options), nil
return newEngineFuncs[options.EngineVersion](id, path, walPath, options), nil
}

// If it's a dir then it's a tsm1 engine
Expand All @@ -109,12 +109,13 @@ func NewEngine(path string, walPath string, options EngineOptions) (Engine, erro
return nil, fmt.Errorf("invalid engine format: %q", format)
}

return fn(path, walPath, options), nil
return fn(id, path, walPath, options), nil
}

// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
ShardID uint64

Config Config
}
Expand Down
6 changes: 4 additions & 2 deletions tsdb/engine/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ type Engine struct {
snapDone chan struct{} // channel to signal snapshot compactions to stop
snapWG sync.WaitGroup // waitgroup for running snapshot compactions

id uint64
path string
logger *log.Logger // Logger to be used for important messages
traceLogger *log.Logger // Logger to be used when trace-logging is on.
Expand Down Expand Up @@ -125,7 +126,7 @@ type Engine struct {
}

// NewEngine returns a new instance of Engine.
func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
func NewEngine(id uint64, path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine {
w := NewWAL(walPath)
fs := NewFileStore(path)
cache := NewCache(uint64(opt.Config.CacheMaxMemorySize), path)
Expand All @@ -136,6 +137,7 @@ func NewEngine(path string, walPath string, opt tsdb.EngineOptions) tsdb.Engine
}

e := &Engine{
id: id,
path: path,
logger: log.New(os.Stderr, "[tsm1] ", log.LstdFlags),
traceLogger: log.New(ioutil.Discard, "[tsm1] ", log.LstdFlags),
Expand Down Expand Up @@ -1268,7 +1270,7 @@ func (e *Engine) createVarRefIterator(opt influxql.IteratorOptions, aggregate bo

for _, mm := range mms {
// Determine tagsets for this measurement based on dimensions and filters.
tagSets, err := mm.TagSets(opt.Dimensions, opt.Condition)
tagSets, err := mm.TagSets(e.id, opt.Dimensions, opt.Condition)
if err != nil {
return err
}
Expand Down
35 changes: 24 additions & 11 deletions tsdb/engine/tsm1/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func TestEngine_Backup(t *testing.T) {
p3 := MustParsePointString("cpu,host=C value=1.3 3000000000")

// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
Expand Down Expand Up @@ -222,7 +222,9 @@ func TestEngine_CreateIterator_Cache_Ascending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -275,7 +277,9 @@ func TestEngine_CreateIterator_Cache_Descending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -328,7 +332,9 @@ func TestEngine_CreateIterator_TSM_Ascending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -382,7 +388,9 @@ func TestEngine_CreateIterator_TSM_Descending(t *testing.T) {

e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A value=1.2 2000000000`,
Expand Down Expand Up @@ -437,7 +445,9 @@ func TestEngine_CreateIterator_Aux(t *testing.T) {
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("F", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A F=100 1000000000`,
Expand Down Expand Up @@ -497,7 +507,9 @@ func TestEngine_CreateIterator_Condition(t *testing.T) {
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("X", influxql.Float, false)
e.MeasurementFields("cpu").CreateFieldIfNotExists("Y", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

if err := e.WritePointsString(
`cpu,host=A value=1.1 1000000000`,
`cpu,host=A X=10 1000000000`,
Expand Down Expand Up @@ -560,7 +572,7 @@ func TestEngine_DeleteSeries(t *testing.T) {
p3 := MustParsePointString("cpu,host=A sum=1.3 3000000000")

// Write those points to the engine.
e := tsm1.NewEngine(f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)
e := tsm1.NewEngine(1, f.Name(), walPath, tsdb.NewEngineOptions()).(*tsm1.Engine)

// mock the planner so compactions don't run during the test
e.CompactionPlan = &mockPlanner{}
Expand Down Expand Up @@ -721,7 +733,8 @@ func MustInitBenchmarkEngine(pointN int) *Engine {
// Initialize metadata.
e.Index().CreateMeasurementIndexIfNotExists("cpu")
e.MeasurementFields("cpu").CreateFieldIfNotExists("value", influxql.Float, false)
e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si := e.Index().CreateSeriesIndexIfNotExists("cpu", tsdb.NewSeries("cpu,host=A", models.NewTags(map[string]string{"host": "A"})))
si.AssignShard(1)

// Generate time ascending points with jitterred time & value.
rand := rand.New(rand.NewSource(0))
Expand Down Expand Up @@ -770,7 +783,7 @@ func NewEngine() *Engine {
panic(err)
}
return &Engine{
Engine: tsm1.NewEngine(
Engine: tsm1.NewEngine(1,
filepath.Join(root, "data"),
filepath.Join(root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine),
Expand Down Expand Up @@ -802,7 +815,7 @@ func (e *Engine) Reopen() error {
return err
}

e.Engine = tsm1.NewEngine(
e.Engine = tsm1.NewEngine(1,
filepath.Join(e.root, "data"),
filepath.Join(e.root, "wal"),
tsdb.NewEngineOptions()).(*tsm1.Engine)
Expand Down
5 changes: 4 additions & 1 deletion tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,7 +771,7 @@ func (m *Measurement) filters(condition influxql.Expr) ([]uint64, map[uint64]inf
// This will also populate the TagSet objects with the series IDs that match each tagset and any
// influx filter expression that goes with the series
// TODO: this shouldn't be exported. However, until tx.go and the engine get refactored into tsdb, we need it.
func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
func (m *Measurement) TagSets(shardID uint64, dimensions []string, condition influxql.Expr) ([]*influxql.TagSet, error) {
m.mu.RLock()

// get the unique set of series ids and the filters that should be applied to each
Expand All @@ -787,6 +787,9 @@ func (m *Measurement) TagSets(dimensions []string, condition influxql.Expr) ([]*
tagSets := make(map[string]*influxql.TagSet, 64)
for _, id := range ids {
s := m.seriesByID[id]
if !s.Assigned(shardID) {
continue
}
tags := make(map[string]string, len(dimensions))

// Build the TagSet for this series.
Expand Down
2 changes: 1 addition & 1 deletion tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (s *Shard) Open() error {
}

// Initialize underlying engine.
e, err := NewEngine(s.path, s.walPath, s.options)
e, err := NewEngine(s.id, s.path, s.walPath, s.options)
if err != nil {
return err
}
Expand Down

0 comments on commit 332de12

Please sign in to comment.