Skip to content

Commit

Permalink
Merge pull request #2752 from influxdb/pd-drop-measurement
Browse files Browse the repository at this point in the history
Wire up DROP MEASUREMENT
  • Loading branch information
pauldix committed Jun 3, 2015
2 parents f5d59ec + a768576 commit 9944678
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 24 deletions.
4 changes: 2 additions & 2 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ var (
ErrFieldTypeConflict = errors.New("field type conflict")
)

func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) }
func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }

func ErrMeasurementNotFound(name string) error { return Errorf("measurement not found: %s", name) }
func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) }

func Errorf(format string, a ...interface{}) (err error) {
if _, file, line, ok := runtime.Caller(2); ok {
Expand Down
54 changes: 50 additions & 4 deletions tsdb/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,13 @@ func NewDatabaseIndex() *DatabaseIndex {
}
}

// Measurement returns the measurement object from the index by the name
func (d *DatabaseIndex) Measurement(name string) *Measurement {
d.mu.RLock()
defer d.mu.RUnlock()
return d.measurements[name]
}

// createSeriesIndexIfNotExists adds the series for the given measurement to the index and sets its ID or returns the existing series object
func (s *DatabaseIndex) createSeriesIndexIfNotExists(measurementName string, series *Series) *Series {
// if there is a measurement for this id, it's already been added
Expand Down Expand Up @@ -210,16 +217,40 @@ func (db *DatabaseIndex) Measurements() Measurements {
return measurements
}

// deleteSeries removes the series keys and their tags from the index
func (db *DatabaseIndex) deleteSeries(keys []string) {
// DropMeasurement removes the measurement and all of its underlying series from the database index
func (db *DatabaseIndex) DropMeasurement(name string) {
db.mu.Lock()
defer db.mu.Unlock()

m := db.measurements[name]
if m == nil {
return
}

delete(db.measurements, name)
for _, s := range m.seriesByID {
delete(db.series, s.Key)
}

var names []string
for _, n := range db.names {
if n != name {
names = append(names, n)
}
}
db.names = names
}

// DropSeries removes the series keys and their tags from the index
func (db *DatabaseIndex) DropSeries(keys []string) {
db.mu.Lock()
defer db.mu.Unlock()
for _, k := range keys {
series := db.series[k]
if series == nil {
continue
}
series.measurement.dropSeries(series.id)
series.measurement.DropSeries(series.id)
}
}

Expand All @@ -228,6 +259,7 @@ func (db *DatabaseIndex) deleteSeries(keys []string) {
// object. Generally these methods are only accessed from Index, which is responsible for ensuring
// go routine safe access.
type Measurement struct {
mu sync.RWMutex
Name string `json:"name,omitempty"`
FieldNames map[string]struct{} `json:"fieldNames,omitempty"`
index *DatabaseIndex
Expand All @@ -254,6 +286,17 @@ func NewMeasurement(name string, idx *DatabaseIndex) *Measurement {
}
}

// seriesKeys returns the keys of every series in this measurement
func (m *Measurement) seriesKeys() []string {
m.mu.RLock()
defer m.mu.RUnlock()
var keys []string
for _, s := range m.seriesByID {
keys = append(keys, s.Key)
}
return keys
}

// HasTagKey returns true if at least one series in this measurement has written a value for the passed in tag key
func (m *Measurement) HasTagKey(k string) bool {
return m.seriesByTagKeyValue[k] != nil
Expand Down Expand Up @@ -297,7 +340,10 @@ func (m *Measurement) addSeries(s *Series) bool {
}

// dropSeries will remove a series from the measurementIndex. Returns true if already removed
func (m *Measurement) dropSeries(seriesID uint64) bool {
func (m *Measurement) DropSeries(seriesID uint64) bool {
m.mu.Lock()
defer m.mu.Unlock()

if _, ok := m.seriesByID[seriesID]; !ok {
return true
}
Expand Down
42 changes: 25 additions & 17 deletions tsdb/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"log"
"os"
"runtime"
"sort"
"strings"

Expand Down Expand Up @@ -165,6 +164,7 @@ func (q *QueryExecutor) ExecuteQuery(query *influxql.Query, database string, chu
case *influxql.ShowSeriesStatement:
res = q.executeShowSeriesStatement(stmt, database)
case *influxql.DropMeasurementStatement:
// TODO: handle this in a cluster
res = q.executeDropMeasurementStatement(stmt, database)
case *influxql.ShowMeasurementsStatement:
res = q.executeShowMeasurementsStatement(stmt, database)
Expand Down Expand Up @@ -390,8 +390,28 @@ func (q *QueryExecutor) executeDropDatabaseStatement(stmt *influxql.DropDatabase
panic("not yet imlemented")
}

// executeDropMeasurementStatement removes the measurement and all series data from the local store for the given measurement
func (q *QueryExecutor) executeDropMeasurementStatement(stmt *influxql.DropMeasurementStatement, database string) *influxql.Result {
panic("not yet implemented")
// Find the database.
db := q.store.DatabaseIndex(database)
if db == nil {
return &influxql.Result{Err: ErrDatabaseNotFound(database)}
}

m := db.Measurement(stmt.Name)
if m == nil {
return &influxql.Result{Err: ErrMeasurementNotFound(stmt.Name)}
}

// first remove from the index
db.DropMeasurement(m.Name)

// now drop the raw data
if err := q.store.deleteMeasurement(m.Name, m.seriesKeys()); err != nil {
return &influxql.Result{Err: err}
}

return &influxql.Result{}
}

// executeDropSeriesStatement removes all series from the local store that match the drop query
Expand Down Expand Up @@ -422,8 +442,6 @@ func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat
if err != nil {
return &influxql.Result{Err: err}
}

// TODO: check return of walkWhereForSeriesIds for fields
} else {
// No WHERE clause so get all series IDs for this measurement.
ids = m.seriesIDs
Expand All @@ -439,7 +457,7 @@ func (q *QueryExecutor) executeDropSeriesStatement(stmt *influxql.DropSeriesStat
return &influxql.Result{Err: err}
}
// remove them from the index
db.deleteSeries(seriesKeys)
db.DropSeries(seriesKeys)

return &influxql.Result{}
}
Expand Down Expand Up @@ -1033,16 +1051,6 @@ var (
ErrNotExecuted = errors.New("not executed")
)

func ErrDatabaseNotFound(name string) error { return Errorf("database not found: %s", name) }
func ErrDatabaseNotFound(name string) error { return fmt.Errorf("database not found: %s", name) }

func ErrMeasurementNotFound(name string) error { return Errorf("measurement not found: %s", name) }

func Errorf(format string, a ...interface{}) (err error) {
if _, file, line, ok := runtime.Caller(2); ok {
a = append(a, file, line)
err = fmt.Errorf(format+" (%s:%d)", a...)
} else {
err = fmt.Errorf(format, a...)
}
return
}
func ErrMeasurementNotFound(name string) error { return fmt.Errorf("measurement not found: %s", name) }
61 changes: 60 additions & 1 deletion tsdb/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ func TestDropSeriesStatement(t *testing.T) {
}

got = executeAndGetJSON("drop series from cpu", executor)
warn("*** ", got)

got = executeAndGetJSON("select * from cpu", executor)
exepected = `[{}]`
Expand Down Expand Up @@ -110,6 +109,66 @@ func TestDropSeriesStatement(t *testing.T) {
}
}

func TestDropMeasurementStatement(t *testing.T) {
store, executor := testStoreAndExecutor()
defer os.RemoveAll(store.path)

pt := NewPoint(
"cpu",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)
pt2 := NewPoint(
"memory",
map[string]string{"host": "server"},
map[string]interface{}{"value": 1.0},
time.Unix(1, 2),
)

err := store.WriteToShard(shardID, []Point{pt, pt2})
if err != nil {
t.Fatalf(err.Error())
}

got := executeAndGetJSON("show series", executor)
exepected := `[{"series":[{"name":"cpu","columns":["_key","host"],"values":[["cpu,host=server","server"]]},{"name":"memory","columns":["_key","host"],"values":[["memory,host=server","server"]]}]}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}

got = executeAndGetJSON("drop measurement memory", executor)
exepected = `[{}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}

validateDrop := func() {
got = executeAndGetJSON("show series", executor)
exepected = `[{"series":[{"name":"cpu","columns":["_key","host"],"values":[["cpu,host=server","server"]]}]}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
got = executeAndGetJSON("show measurements", executor)
exepected = `[{"series":[{"name":"measurements","columns":["name"],"values":[["cpu"]]}]}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
got = executeAndGetJSON("select * from memory", executor)
exepected = `[{"error":"measurement not found: \"foo\".\"foo\".memory"}]`
if exepected != got {
t.Fatalf("exp: %s\ngot: %s", exepected, got)
}
}

validateDrop()
store.Close()
store = NewStore(store.path)
store.Open()
executor.store = store
validateDrop()
}

// ensure that authenticate doesn't return an error if the user count is zero and they're attempting
// to create a user.
func TestAuthenticateIfUserCountZeroAndCreateUser(t *testing.T) {
Expand Down
26 changes: 26 additions & 0 deletions tsdb/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,32 @@ func (s *Shard) deleteSeries(keys []string) error {
return nil
}

// deleteMeasurement deletes the measurement field encoding information and all underlying series from the shard
func (s *Shard) deleteMeasurement(name string, seriesKeys []string) error {
if err := s.db.Update(func(tx *bolt.Tx) error {
bm := tx.Bucket([]byte("fields"))
if err := bm.Delete([]byte(name)); err != nil {
return err
}
b := tx.Bucket([]byte("series"))
for _, k := range seriesKeys {
if err := b.Delete([]byte(k)); err != nil {
return err
}
if err := tx.DeleteBucket([]byte(k)); err != nil {
return err
}
}

return nil
}); err != nil {
_ = s.Close()
return err
}

return nil
}

func (s *Shard) createFieldsAndMeasurements(fieldsToCreate []*fieldCreate) (map[string]*measurementFields, error) {
if len(fieldsToCreate) == 0 {
return nil, nil
Expand Down
12 changes: 12 additions & 0 deletions tsdb/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,18 @@ func (s *Store) deleteSeries(keys []string) error {
return nil
}

// deleteMeasurement loops through the local shards and removes the measurement field encodings from each shard
func (s *Store) deleteMeasurement(name string, seriesKeys []string) error {
s.mu.RLock()
defer s.mu.RUnlock()
for _, sh := range s.shards {
if err := sh.deleteMeasurement(name, seriesKeys); err != nil {
return err
}
}
return nil
}

func (s *Store) loadIndexes() error {
dbs, err := ioutil.ReadDir(s.path)
if err != nil {
Expand Down

0 comments on commit 9944678

Please sign in to comment.