Skip to content

Commit

Permalink
Merge pull request #175 from influxdb/drop-databases-and-series
Browse files Browse the repository at this point in the history
Implement drop series and revisit drop database to distribute its effect to all nodes in the cluster
  • Loading branch information
jvshahid committed Jan 10, 2014
2 parents de6a3f5 + 5c925a5 commit b993ae2
Show file tree
Hide file tree
Showing 14 changed files with 300 additions and 111 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
- [Issue #149](https://github.com/influxdb/influxdb/issues/149). Cluster admins should be able to perform reads and writes.
- [Issue #108](https://github.com/influxdb/influxdb/issues/108). Querying one point using `time =`
- [Issue #114](https://github.com/influxdb/influxdb/issues/114). Servers should periodically check that they're consistent.
- [Issue #93](https://github.com/influxdb/influxdb/issues/93). Should be able to drop a time series

## Bugfixes

Expand All @@ -176,6 +177,7 @@
- [Issue #158](https://github.com/influxdb/influxdb/issues/158). Logged deletes should be stored with the time range if missing.
- [Issue #136](https://github.com/influxdb/influxdb/issues/136). Make sure writes are replicated in order to avoid triggering replays
- [Issue #145](https://github.com/influxdb/influxdb/issues/145). Server fails to join cluster if all starting at same time.
- [Issue #176](https://github.com/influxdb/influxdb/issues/176). Drop database should take effect on all nodes

### Deprecated

Expand Down
14 changes: 14 additions & 0 deletions src/api/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func (self *HttpServer) Serve(listener net.Listener) {

// Write points to the given database
self.registerEndpoint(p, "post", "/db/:db/series", self.writePoints)
self.registerEndpoint(p, "del", "/db/:db/series/:series", self.dropSeries)
self.registerEndpoint(p, "get", "/db", self.listDatabases)
self.registerEndpoint(p, "post", "/db", self.createDatabase)
self.registerEndpoint(p, "del", "/db/:name", self.dropDatabase)
Expand Down Expand Up @@ -431,6 +432,19 @@ func (self *HttpServer) dropDatabase(w libhttp.ResponseWriter, r *libhttp.Reques
})
}

func (self *HttpServer) dropSeries(w libhttp.ResponseWriter, r *libhttp.Request) {
db := r.URL.Query().Get(":db")
series := r.URL.Query().Get(":series")

self.tryAsDbUserAndClusterAdmin(w, r, func(user common.User) (int, interface{}) {
err := self.coordinator.DropSeries(user, db, series)
if err != nil {
return errorToStatusCode(err), err.Error()
}
return libhttp.StatusNoContent, nil
})
}

type Point struct {
Timestamp int64 `json:"timestamp"`
SequenceNumber uint32 `json:"sequenceNumber"`
Expand Down
25 changes: 1 addition & 24 deletions src/api/http/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,28 +96,13 @@ func (self *MockEngine) RunQuery(_ common.User, _ string, query string, localOnl
}

type MockCoordinator struct {
coordinator.Coordinator
series []*protocol.Series
deleteQueries []*parser.DeleteQuery
db string
droppedDb string
}

func (self *MockCoordinator) GetLastSequenceNumber(replicationFactor uint8, _, _ uint32) (uint64, error) {
return 0, nil
}

func (self *MockCoordinator) DistributeQuery(_ common.User, db string, query *parser.SelectQuery, localOnly bool, yield func(*protocol.Series) error) error {
return nil
}

func (self *MockCoordinator) ReplicateDelete(request *protocol.Request) error {
return nil
}

func (self *MockCoordinator) ListSeries(_ common.User, _ string) ([]*protocol.Series, error) {
return nil, nil
}

func (self *MockCoordinator) WriteSeriesData(_ common.User, db string, series *protocol.Series) error {
self.series = append(self.series, series)
return nil
Expand All @@ -142,14 +127,6 @@ func (self *MockCoordinator) DropDatabase(_ common.User, db string) error {
return nil
}

func (self *MockCoordinator) ReplicateWrite(request *protocol.Request) error {
return nil
}

func (self *MockCoordinator) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) {
return
}

func (self *ApiSuite) formatUrl(path string, args ...interface{}) string {
path = fmt.Sprintf(path, args...)
port := self.listener.Addr().(*net.TCPAddr).Port
Expand Down
125 changes: 114 additions & 11 deletions src/coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,19 @@ var (

// shorter constants for readability
var (
proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
queryRequest = protocol.Request_QUERY
listSeriesRequest = protocol.Request_LIST_SERIES
listSeriesResponse = protocol.Response_LIST_SERIES
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
replayReplication = protocol.Request_REPLICATION_REPLAY
sequenceNumber = protocol.Request_SEQUENCE_NUMBER
proxyWrite = protocol.Request_PROXY_WRITE
proxyDelete = protocol.Request_PROXY_DELETE
proxyDropDatabase = protocol.Request_PROXY_DROP_DATABASE
replicateDropDatabase = protocol.Request_REPLICATION_DROP_DATABASE
proxyDropSeries = protocol.Request_PROXY_DROP_SERIES
replicateDropSeries = protocol.Request_REPLICATION_DROP_SERIES
queryRequest = protocol.Request_QUERY
listSeriesRequest = protocol.Request_LIST_SERIES
listSeriesResponse = protocol.Response_LIST_SERIES
endStreamResponse = protocol.Response_END_STREAM
queryResponse = protocol.Response_QUERY
replayReplication = protocol.Request_REPLICATION_REPLAY
sequenceNumber = protocol.Request_SEQUENCE_NUMBER
)

func NewCoordinatorImpl(datastore datastore.Datastore, raftServer ClusterConsensus, clusterConfiguration *ClusterConfiguration) *CoordinatorImpl {
Expand Down Expand Up @@ -502,7 +506,7 @@ func (self *CoordinatorImpl) getCurrentSequenceNumber(replicationFactor uint8, o
}

func (self *CoordinatorImpl) ReplayReplication(request *protocol.Request, replicationFactor *uint8, owningServerId *uint32, lastSeenSequenceNumber *uint64) {
log.Warn("COORDINATOR: ReplayReplication: SN: %d, LS: %d, RF: %d, OS: %d", *request.SequenceNumber, *lastSeenSequenceNumber, *replicationFactor, *owningServerId)
log.Warn("COORDINATOR: ReplayReplication: LS: %d, RF: %d, OS: %d", *lastSeenSequenceNumber, *replicationFactor, *owningServerId)
key := fmt.Sprintf("%d_%d_%d_%d", *replicationFactor, *request.ClusterVersion, *request.OriginatingServerId, *owningServerId)
self.runningReplaysLock.Lock()
requestsWaitingToWrite := self.runningReplays[key]
Expand Down Expand Up @@ -702,6 +706,71 @@ func (self *CoordinatorImpl) handleSeriesDelete(user common.User, server *Cluste
return self.proxyUntilSuccess(servers, request)
}

func (self *CoordinatorImpl) handleDropDatabase(server *ClusterServer, database string) error {
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)

request := self.createRequest(proxyDropDatabase, &database)
request.OriginatingServerId = &self.clusterConfiguration.localServerId
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
request.OwnerServerId = &owner.Id
replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database))
request.ReplicationFactor = &replicationFactor

if server.Id == self.clusterConfiguration.localServerId {
// this is a local delete
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
if err != nil {
return self.proxyUntilSuccess(servers, request)
}
self.datastore.DropDatabase(database)
if err != nil {
log.Error("Couldn't write data to local store: ", err, request)
}

// ignoring the error because we still want to send to replicas
request.Type = &replicateDropDatabase
self.sendRequestToReplicas(request, servers)
return nil
}

// otherwise, proxy the request
return self.proxyUntilSuccess(servers, request)
}

func (self *CoordinatorImpl) handleDropSeries(server *ClusterServer, database, series string) error {
owner, servers := self.clusterConfiguration.GetReplicas(server, &database)

request := self.createRequest(proxyDropSeries, &database)
request.OriginatingServerId = &self.clusterConfiguration.localServerId
request.ClusterVersion = &self.clusterConfiguration.ClusterVersion
request.OwnerServerId = &owner.Id
request.Series = &protocol.Series{Name: &series}
replicationFactor := uint32(self.clusterConfiguration.GetDatabaseReplicationFactor(database))
request.ReplicationFactor = &replicationFactor

if server.Id == self.clusterConfiguration.localServerId {
// this is a local delete
replicationFactor := self.clusterConfiguration.GetReplicationFactor(&database)
err := self.datastore.LogRequestAndAssignSequenceNumber(request, &replicationFactor, &owner.Id)
if err != nil {
return self.proxyUntilSuccess(servers, request)
}
self.datastore.DropSeries(database, series)
if err != nil {
log.Error("Couldn't write data to local store: ", err, request)
}

// ignoring the error because we still want to send to replicas
request.Type = &replicateDropSeries
self.sendRequestToReplicas(request, servers)
return nil
}

// otherwise, proxy the request
return self.proxyUntilSuccess(servers, request)
}

func (self *CoordinatorImpl) writeSeriesToLocalStore(db *string, series *protocol.Series) error {
return self.datastore.WriteSeriesData(*db, series)
}
Expand Down Expand Up @@ -871,11 +940,45 @@ func (self *CoordinatorImpl) DropDatabase(user common.User, db string) error {
return common.NewAuthorizationError("Insufficient permission to drop database")
}

if self.clusterConfiguration.IsSingleServer() {
if err := self.datastore.DropDatabase(db); err != nil {
return err
}
} else {
servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
for _, server := range servers {
if err := self.handleDropDatabase(server.server, db); err != nil {
return err
}
}
}

// don't delete the metadata, we need the replication factor to be
// able to replicate the request properly
if err := self.raftServer.DropDatabase(db); err != nil {
return err
}

return self.datastore.DropDatabase(db)
return nil
}

func (self *CoordinatorImpl) DropSeries(user common.User, db, series string) error {
if !user.IsClusterAdmin() && !user.IsDbAdmin(db) && !user.HasWriteAccess(series) {
return common.NewAuthorizationError("Insufficient permission to drop series")
}

if self.clusterConfiguration.IsSingleServer() {
return self.datastore.DropSeries(db, series)
}

servers, _ := self.clusterConfiguration.GetServersToMakeQueryTo(&db)
for _, server := range servers {
if err := self.handleDropSeries(server.server, db, series); err != nil {
return err
}
}

return nil
}

func (self *CoordinatorImpl) AuthenticateDbUser(db, username, password string) (common.User, error) {
Expand Down
3 changes: 1 addition & 2 deletions src/coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ func (self *CoordinatorSuite) TestAutomaticDbCreations(c *C) {
}

// if the db is dropped it should remove the users as well
c.Assert(coordinator.DropDatabase(root, "db1"), IsNil)
c.Assert(coordinator.datastore.(*DatastoreMock).DroppedDatabase, Equals, "db1")
c.Assert(servers[0].DropDatabase("db1"), IsNil)
_, err = coordinator.AuthenticateDbUser("db1", "db_user", "pass")
c.Assert(err, ErrorMatches, ".*Invalid.*")
}
Expand Down
1 change: 1 addition & 0 deletions src/coordinator/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Coordinator interface {
WriteSeriesData(user common.User, db string, series *protocol.Series) error
DeleteSeriesData(user common.User, db string, query *parser.DeleteQuery, localOnly bool) error
DropDatabase(user common.User, db string) error
DropSeries(user common.User, db, series string) error
CreateDatabase(user common.User, db string, replicationFactor uint8) error
ListDatabases(user common.User) ([]*Database, error)
ListSeries(user common.User, database string) ([]*protocol.Series, error)
Expand Down
69 changes: 65 additions & 4 deletions src/coordinator/protobuf_request_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package coordinator
import (
"bytes"
log "code.google.com/p/log4go"
"common"
"datastore"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -34,12 +33,10 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
if *request.Type == protocol.Request_PROXY_WRITE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}

location := common.RingLocation(request.Database, request.Series.Name, request.Series.Points[0].Timestamp)
ownerId := self.clusterConfig.GetOwnerIdByLocation(&location)
request.OriginatingServerId = &self.clusterConfig.localServerId
// TODO: make request logging and datastore write atomic
replicationFactor := self.clusterConfig.GetReplicationFactor(request.Database)
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, ownerId)
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
Expand All @@ -51,6 +48,70 @@ func (self *ProtobufRequestHandler) HandleRequest(request *protocol.Request, con
// TODO: add quorum writes?
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_PROXY_DROP_SERIES {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}

request.OriginatingServerId = &self.clusterConfig.localServerId
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
err = self.db.DropSeries(*request.Database, *request.Series.Name)
if err != nil {
return err
}
err = self.WriteResponse(conn, response)
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_REPLICATION_DROP_SERIES {
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
switch err := err.(type) {
case datastore.SequenceMissingRequestsError:
log.Warn("Missing sequence number error: Request SN: %v Last Known SN: %v", request.GetSequenceNumber(), err.LastKnownRequestSequence)
go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence)
return nil
default:
return err
}
}
return self.db.DropSeries(*request.Database, *request.Series.Name)
} else if *request.Type == protocol.Request_PROXY_DROP_DATABASE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}

request.OriginatingServerId = &self.clusterConfig.localServerId
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
return err
}
err = self.db.DropDatabase(*request.Database)
if err != nil {
return err
}
err = self.WriteResponse(conn, response)
self.coordinator.ReplicateWrite(request)
return err
} else if *request.Type == protocol.Request_REPLICATION_DROP_DATABASE {
replicationFactor := uint8(*request.ReplicationFactor)
// TODO: make request logging and datastore write atomic
err := self.db.LogRequestAndAssignSequenceNumber(request, &replicationFactor, request.OwnerServerId)
if err != nil {
switch err := err.(type) {
case datastore.SequenceMissingRequestsError:
log.Warn("Missing sequence number error: Request SN: %v Last Known SN: %v", request.GetSequenceNumber(), err.LastKnownRequestSequence)
go self.coordinator.ReplayReplication(request, &replicationFactor, request.OwnerServerId, &err.LastKnownRequestSequence)
return nil
default:
return err
}
}
return self.db.DropDatabase(*request.Database)
} else if *request.Type == protocol.Request_PROXY_DELETE {
response := &protocol.Response{RequestId: request.Id, Type: &self.writeOk}

Expand Down
1 change: 1 addition & 0 deletions src/datastore/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ type Datastore interface {
DeleteSeriesData(database string, query *parser.DeleteQuery) error
GetSeriesForDatabase(database string, yield func(string) error) error
DropDatabase(database string) error
DropSeries(database, series string) error
Close()
}
4 changes: 2 additions & 2 deletions src/datastore/leveldb_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ func (self *LevelDbDatastore) WriteSeriesData(database string, series *protocol.
return self.db.Write(self.writeOptions, wb)
}

func (self *LevelDbDatastore) dropSeries(database, series string) error {
func (self *LevelDbDatastore) DropSeries(database, series string) error {
startTimeBytes := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00}
endTimeBytes := []byte{0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF, 0xFF}

Expand All @@ -343,7 +343,7 @@ func (self *LevelDbDatastore) DropDatabase(database string) error {
defer wb.Close()

err := self.GetSeriesForDatabase(database, func(name string) error {
if err := self.dropSeries(database, name); err != nil {
if err := self.DropSeries(database, name); err != nil {
return err
}

Expand Down
Loading

0 comments on commit b993ae2

Please sign in to comment.