Skip to content

Commit

Permalink
Merge pull request #1629 from influxdb/wire-up-drop-series
Browse files Browse the repository at this point in the history
Wire up drop series parsing
  • Loading branch information
corylanou committed Feb 22, 2015
2 parents 77a07b6 + 0f42be3 commit 3c512b1
Show file tree
Hide file tree
Showing 9 changed files with 506 additions and 19 deletions.
85 changes: 85 additions & 0 deletions database.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,54 @@ func (m *Measurement) addSeries(s *Series) bool {
return true
}

// removeSeries will remove a series from the measurementIndex. Returns true if already removed
func (m *Measurement) dropSeries(seriesID uint32) bool {
if _, ok := m.seriesByID[seriesID]; !ok {
return true
}
s := m.seriesByID[seriesID]
tagset := string(marshalTags(s.Tags))

delete(m.series, tagset)
delete(m.seriesByID, seriesID)

var ids []uint32
for _, id := range m.seriesIDs {
if id != seriesID {
ids = append(ids, id)
}
}
m.seriesIDs = ids

// remove this series id to the tag index on the measurement
// s.seriesByTagKeyValue is defined as map[string]map[string]seriesIDs
for k, v := range m.seriesByTagKeyValue {
values := v
for kk, vv := range values {
var ids []uint32
for _, id := range vv {
if id != seriesID {
ids = append(ids, id)
}
}
// Check to see if we have any ids, if not, remove the key
if len(ids) == 0 {
delete(values, kk)
} else {
values[kk] = ids
}
}
// If we have no values, then we delete the key
if len(values) == 0 {
delete(m.seriesByTagKeyValue, k)
} else {
m.seriesByTagKeyValue[k] = values
}
}

return true
}

// seriesByTags returns the Series that matches the given tagset.
func (m *Measurement) seriesByTags(tags map[string]string) *Series {
return m.series[string(marshalTags(tags))]
Expand Down Expand Up @@ -1002,6 +1050,17 @@ func (rp *RetentionPolicy) shardGroupByID(shardID uint64) *ShardGroup {
return nil
}

// dropSeries will delete all data with the seriesID
func (rp *RetentionPolicy) dropSeries(seriesID uint32) error {
for _, g := range rp.shardGroups {
err := g.dropSeries(seriesID)
if err != nil {
return err
}
}
return nil
}

func (rp *RetentionPolicy) removeShardGroupByID(shardID uint64) {
for i, g := range rp.shardGroups {
if g.ID == shardID {
Expand Down Expand Up @@ -1075,6 +1134,32 @@ func (db *database) addSeriesToIndex(measurementName string, s *Series) bool {
return idx.addSeries(s)
}

// dropSeries removes the series from the in memory references
func (db *database) dropSeries(seriesByMeasurement map[string][]uint32) error {
for measurement, ids := range seriesByMeasurement {
for _, id := range ids {
// if the series is already gone, return
if db.series[id] == nil {
continue
}

delete(db.series, id)

// Remove series information from measurements
db.measurements[measurement].dropSeries(id)

// Remove shard data
for _, rp := range db.policies {
if err := rp.dropSeries(id); err != nil {
return err
}
}
}
}

return nil
}

// createMeasurementIfNotExists will either add a measurement object to the index or return the existing one.
func (db *database) createMeasurementIfNotExists(name string) *Measurement {
idx := db.measurements[name]
Expand Down
21 changes: 21 additions & 0 deletions httpd/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1156,6 +1156,27 @@ func TestHandler_ShowContinuousQueries(t *testing.T) {

}

func TestHandler_DropSeries(t *testing.T) {
srvr := OpenAuthlessServer(NewMessagingClient())
srvr.CreateDatabase("foo")
srvr.CreateRetentionPolicy("foo", influxdb.NewRetentionPolicy("bar"))
s := NewHTTPServer(srvr)
defer s.Close()

status, _ := MustHTTP("POST", s.URL+`/write`, nil, nil, `{"database" : "foo", "retentionPolicy" : "bar", "points": [{"name": "cpu", "tags": {"host": "server01"},"timestamp": "2009-11-10T23:00:00Z","values": {"value": 100}}]}`)

if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}

query := map[string]string{"db": "foo", "q": "DROP SERIES FROM cpu"}
status, _ = MustHTTP("GET", s.URL+`/query`, query, nil, "")

if status != http.StatusOK {
t.Fatalf("unexpected status: %d", status)
}
}

func TestHandler_serveWriteSeries(t *testing.T) {
srvr := OpenAuthenticatedServer(NewMessagingClient())
srvr.CreateDatabase("foo")
Expand Down
30 changes: 28 additions & 2 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,11 +985,37 @@ func (s *ShowSeriesStatement) RequiredPrivileges() ExecutionPrivileges {

// DropSeriesStatement represents a command for removing a series from the database.
type DropSeriesStatement struct {
Name string
// The Id of the series being dropped (optional)
SeriesID uint32

// Data source that fields are extracted from (optional)
Source Source

// An expression evaluated on data point (optional)
Condition Expr
}

// String returns a string representation of the drop series statement.
func (s *DropSeriesStatement) String() string { return fmt.Sprintf("DROP SERIES %s", s.Name) }
func (s *DropSeriesStatement) String() string {
var buf bytes.Buffer
i, _ := buf.WriteString("DROP SERIES")

if s.Source != nil {
_, _ = buf.WriteString(" FROM ")
_, _ = buf.WriteString(s.Source.String())
}
if s.Condition != nil {
_, _ = buf.WriteString(" WHERE ")
_, _ = buf.WriteString(s.Condition.String())
}

// If we haven't written any data since the initial statement, then this was a SeriesID statement
if len(buf.String()) == i {
_, _ = buf.WriteString(fmt.Sprintf(" %d", s.SeriesID))
}

return buf.String()
}

// RequiredPrivileges returns the privilige reqired to execute a DropSeriesStatement.
func (s DropSeriesStatement) RequiredPrivileges() ExecutionPrivileges {
Expand Down
25 changes: 21 additions & 4 deletions influxql/parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,14 +859,31 @@ func (p *Parser) parseShowFieldKeysStatement() (*ShowFieldKeysStatement, error)
// This function assumes the "DROP SERIES" tokens have already been consumed.
func (p *Parser) parseDropSeriesStatement() (*DropSeriesStatement, error) {
stmt := &DropSeriesStatement{}
var err error

// Read the name of the series to drop.
lit, err := p.parseIdent()
if err != nil {
if tok, _, _ := p.scanIgnoreWhitespace(); tok == FROM {
// Parse source.
if stmt.Source, err = p.parseSource(); err != nil {
return nil, err
}
} else {
p.unscan()
}

// Parse condition: "WHERE EXPR".
if stmt.Condition, err = p.parseCondition(); err != nil {
return nil, err
}
stmt.Name = lit

// If they didn't provide a FROM or a WHERE, they need to provide the SeriesID
if stmt.Condition == nil && stmt.Source == nil {
var id int
id, err = p.parseInt(0, math.MaxUint32)
if err != nil {
return nil, err
}
stmt.SeriesID = uint32(id)
}
return stmt, nil
}

Expand Down
89 changes: 86 additions & 3 deletions influxql/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,8 +315,33 @@ func TestParser_ParseStatement(t *testing.T) {

// DROP SERIES statement
{
s: `DROP SERIES myseries`,
stmt: &influxql.DropSeriesStatement{Name: "myseries"},
s: `DROP SERIES 1`,
stmt: &influxql.DropSeriesStatement{SeriesID: 1},
},
{
s: `DROP SERIES FROM src`,
stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}},
},
{
s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
{
s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},

// SHOW CONTINUOUS QUERIES statement
Expand Down Expand Up @@ -604,7 +629,9 @@ func TestParser_ParseStatement(t *testing.T) {
{s: `DELETE`, err: `found EOF, expected FROM at line 1, char 8`},
{s: `DELETE FROM`, err: `found EOF, expected identifier at line 1, char 13`},
{s: `DELETE FROM myseries WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `DROP SERIES`, err: `found EOF, expected identifier at line 1, char 13`},
{s: `DROP SERIES`, err: `found EOF, expected number at line 1, char 13`},
{s: `DROP SERIES FROM`, err: `found EOF, expected identifier at line 1, char 18`},
{s: `DROP SERIES FROM src WHERE`, err: `found EOF, expected identifier, string, number, bool at line 1, char 28`},
{s: `SHOW CONTINUOUS`, err: `found EOF, expected QUERIES at line 1, char 17`},
{s: `SHOW RETENTION`, err: `found EOF, expected POLICIES at line 1, char 16`},
{s: `SHOW RETENTION POLICIES`, err: `found EOF, expected identifier at line 1, char 25`},
Expand Down Expand Up @@ -933,6 +960,62 @@ func TestQuoteIdent(t *testing.T) {
}
}

// Ensure DropSeriesStatement can convert to a string
func TestDropSeriesStatement_String(t *testing.T) {
var tests = []struct {
s string
stmt influxql.Statement
}{
{
s: `DROP SERIES 1`,
stmt: &influxql.DropSeriesStatement{SeriesID: 1},
},
{
s: `DROP SERIES FROM src`,
stmt: &influxql.DropSeriesStatement{Source: &influxql.Measurement{Name: "src"}},
},
{
s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
{
s: `DROP SERIES FROM src WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Source: &influxql.Measurement{Name: "src"},
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
{
s: `DROP SERIES WHERE host = 'hosta.influxdb.org'`,
stmt: &influxql.DropSeriesStatement{
Condition: &influxql.BinaryExpr{
Op: influxql.EQ,
LHS: &influxql.VarRef{Val: "host"},
RHS: &influxql.StringLiteral{Val: "hosta.influxdb.org"},
},
},
},
}

for _, test := range tests {
s := test.stmt.String()
if s != test.s {
t.Errorf("error rendering string. expected %s, actual: %s", test.s, s)
}
}
}

func BenchmarkParserParseStatement(b *testing.B) {
b.ReportAllocs()
s := `SELECT field FROM "series" WHERE value > 10`
Expand Down
19 changes: 16 additions & 3 deletions metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"encoding/binary"
"fmt"
"time"
"unsafe"

"github.com/boltdb/bolt"
)
Expand Down Expand Up @@ -231,15 +230,29 @@ func (tx *metatx) createSeries(database, name string, tags map[string]string) (*

// store the tag map for the series
s := &Series{ID: uint32(id), Tags: tags}
idBytes := make([]byte, 4)
*(*uint32)(unsafe.Pointer(&idBytes[0])) = uint32(id)
idBytes := u32tob(uint32(id))

if err := b.Put(idBytes, mustMarshalJSON(s)); err != nil {
return nil, err
}
return s, nil
}

// dropSeries removes all seriesIDS for a given database/measurement
func (tx *metatx) dropSeries(database string, seriesByMeasurement map[string][]uint32) error {
for measurement, ids := range seriesByMeasurement {
b := tx.Bucket([]byte("Databases")).Bucket([]byte(database)).Bucket([]byte("Series")).Bucket([]byte(measurement))
if b != nil {
for _, id := range ids {
if err := b.Delete(u32tob(id)); err != nil {
return err
}
}
}
}
return nil
}

// loops through all the measurements and series in a database
func (tx *metatx) indexDatabase(db *database) {
// get the bucket that holds series data for the database
Expand Down
Loading

0 comments on commit 3c512b1

Please sign in to comment.