Skip to content

Commit

Permalink
Don't write points if they are too old
Browse files Browse the repository at this point in the history
Background of the bug: Prior to this patch we actually tried writing
points that were older than the retention period of the shard. This
caused race condition when it came to writing points to a shard that's
being dropped, which will happen frequently if the user is loading old
data (by accident). This is demonstrated in the test in this commit.This
bug was previously addressed in #985. It turns the fix for #985 wasn't
enough. A user reported in #1078 that some shards are left behind and
not deleted.

It turns out that while the shard is being dropped more write
requests could come in and end up on line `cluster/shard.go:195` which
will cause the datastore to create a shard on disk that isn't tracked
anywhere in the metadata. This shard will live forever and never get
deleted. This fix address this issue by not writing old points in, but
there are still some edge cases with the current implementation, at
least not as bad as current master.

Close #1078
  • Loading branch information
jvshahid committed Nov 3, 2014
1 parent 63b1169 commit 1f5f5cb
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 16 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
- [Issue #1076](https://github.com/influxdb/influxdb/issues/1076). Fix
the timestamps of data points written by the collectd plugin. (Thanks,
@renchap for reporting this bug)
- [Issue #1078](https://github.com/influxdb/influxdb/issues/1078). Make sure
we don't resurrect shard directories for shards that have already expired

## v0.8.5 [2014-10-27]

Expand Down
15 changes: 15 additions & 0 deletions cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,10 @@ func (self *ClusterConfiguration) createDefaultShardSpace(database string) (*Sha
return space, nil
}

// Given a db and series name and pick a shard where a point with the
// given timestamp should be written to. If the point is outside the
// retention period of the shard space then return nil. The returned
// shard is always nil if err != nil
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error) {
shardSpace := self.getShardSpaceToMatchSeriesName(db, series)
if shardSpace == nil {
Expand All @@ -804,6 +808,17 @@ func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series st
return nil, err
}
}

// if the shard will be dropped anyway because of the shard space
// retention period, then return nothing. Don't try to write
retention := shardSpace.ParsedRetentionPeriod()
if retention != InfiniteRetention {
_, endTime := self.getStartAndEndBasedOnDuration(microsecondsEpoch, shardSpace.SecondsOfDuration())
if endTime.Before(time.Now().Add(-retention)) {
return nil, nil
}
}

matchingShards := make([]*ShardData, 0)
for _, s := range shardSpace.shards {
if s.IsMicrosecondInRange(microsecondsEpoch) {
Expand Down
4 changes: 3 additions & 1 deletion cluster/shard_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
DEFAULT_RETENTION_POLICY_DURATION = 0
)

const InfiniteRetention = time.Duration(0)

func NewShardSpace(database, name string) *ShardSpace {
s := &ShardSpace{
Database: database,
Expand Down Expand Up @@ -119,7 +121,7 @@ func (s *ShardSpace) ParsedRetentionPeriod() time.Duration {
if s.RetentionPolicy == "" {
return DEFAULT_RETENTION_POLICY_DURATION
} else if s.RetentionPolicy == "inf" {
return time.Duration(0)
return InfiniteRetention
}
d, _ := common.ParseTimeDuration(s.RetentionPolicy)
return time.Duration(d)
Expand Down
17 changes: 14 additions & 3 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -510,20 +510,31 @@ func (self *Coordinator) CommitSeriesData(db string, serieses []*protocol.Series
// TODO: this isn't needed anymore
series.SortPointsTimeDescending()

sn := series.GetName()
// use regular for loop since we update the iteration index `i' as
// we batch points that have the same timestamp
for i := 0; i < len(series.Points); {
if len(series.GetName()) == 0 {
return fmt.Errorf("Series name cannot be empty")
}

shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, series.GetName(), series.Points[i].GetTimestamp())
ts := series.Points[i].GetTimestamp()
shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, sn, ts)
if err != nil {
return err
}
log.Fine("GetShardToWriteToBySeriesAndTime(%s, %s, %d) = (%s, %v)", db, sn, ts, shard, err)
firstIndex := i
timestamp := series.Points[i].GetTimestamp()
for ; i < len(series.Points) && series.Points[i].GetTimestamp() == timestamp; i++ {
for ; i < len(series.Points) && series.Points[i].GetTimestamp() == ts; i++ {
// add all points with the same timestamp
}

// if shard == nil, then the points shouldn't be writte. This
// will happen if the points had timestamps earlier than the
// retention period
if shard == nil {
continue
}
newSeries := &protocol.Series{Name: series.Name, Fields: series.Fields, Points: series.Points[firstIndex:i:i]}

shardIdToShard[shard.Id()] = shard
Expand Down
6 changes: 5 additions & 1 deletion datastore/shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,19 @@ func (self *ShardDatastore) incrementShardRefCountAndCloseOldestIfNeeded(id uint
func (self *ShardDatastore) ReturnShard(id uint32) {
self.shardsLock.Lock()
defer self.shardsLock.Unlock()
log.Fine("Returning shard. id = %d", id)
self.shardRefCounts[id] -= 1
if self.shardRefCounts[id] != 0 {
return
}

log.Fine("Checking if shard should be deleted. id = %d", id)
if _, ok := self.shardsToDelete[id]; ok {
self.deleteShard(id)
return
}

log.Fine("Checking if shard should be closed. id = %d", id)
if self.shardsToClose[id] {
self.closeShard(id)
}
Expand Down Expand Up @@ -251,7 +254,8 @@ func (self *ShardDatastore) DeleteShard(shardId uint32) {
// now. We have to wait until it's returned and delete
// it. ReturnShard will take care of that as soon as the reference
// count becomes 0.
if self.shardRefCounts[shardId] > 0 {
if refc := self.shardRefCounts[shardId]; refc > 0 {
log.Fine("Cannot delete shard: shardId = %d, shardRefCounts[shardId] = %d", shardId, refc)
self.shardsToDelete[shardId] = struct{}{}
return
}
Expand Down
33 changes: 22 additions & 11 deletions integration/helpers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ type Server struct {
p *os.Process
configFile string
sslOnly bool
apiPort int
sslApiPort int
config *configuration.Configuration
args []string
}

Expand All @@ -42,7 +41,7 @@ func NewServer(configFile string, c *C) *Server {

func newServerCommon(configFile string, deleteData, ssl bool, c *C, args ...string) *Server {
config, _ := configuration.LoadConfiguration("../" + configFile)
s := &Server{configFile: configFile, apiPort: config.ApiHttpPort, sslApiPort: config.ApiHttpSslPort, sslOnly: ssl, args: args}
s := &Server{configFile: configFile, config: config, sslOnly: ssl, args: args}
if deleteData {
c.Assert(os.RemoveAll(config.DataDir), IsNil)
c.Assert(os.RemoveAll(config.WalDir), IsNil)
Expand All @@ -55,9 +54,9 @@ func newServerCommon(configFile string, deleteData, ssl bool, c *C, args ...stri
}

func (self *Server) WaitForServerToStart() {
url := fmt.Sprintf("http://localhost:%d/ping", self.apiPort)
url := fmt.Sprintf("http://localhost:%d/ping", self.ApiPort())
if self.sslOnly {
url = fmt.Sprintf("https://localhost:%d/ping", self.sslApiPort)
url = fmt.Sprintf("https://localhost:%d/ping", self.SslApiPort())
}

client := http.Client{
Expand Down Expand Up @@ -91,7 +90,7 @@ func (self *Server) WaitForServerToStart() {

func (self *Server) WaitForServerToSync() {
for i := 0; i < 600; i++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/sync?u=root&p=root", self.apiPort))
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/sync?u=root&p=root", self.ApiPort()))
if err != nil {
panic(err)
}
Expand All @@ -116,7 +115,7 @@ func (self *Server) GetClient(db string, c *C) *influxdb.Client {

func (self *Server) GetClientWithUser(db, username, password string, c *C) *influxdb.Client {
client, err := influxdb.NewClient(&influxdb.ClientConfig{
Host: fmt.Sprintf("localhost:%d", self.apiPort),
Host: fmt.Sprintf("localhost:%d", self.ApiPort()),
Username: username,
Password: password,
Database: db,
Expand Down Expand Up @@ -206,8 +205,20 @@ func (self *Server) Start() error {
return nil
}

func (self *Server) RetentionSweepPeriod() time.Duration {
return self.config.StorageRetentionSweepPeriod.Duration
}

func (self *Server) DataDir() string {
return self.config.DataDir
}

func (self *Server) ApiPort() int {
return self.apiPort
return self.config.ApiHttpPort
}

func (self *Server) SslApiPort() int {
return self.config.ApiHttpSslPort
}

func (self *Server) Stop() {
Expand Down Expand Up @@ -283,7 +294,7 @@ func (self *Server) QueryAsRoot(database, query string, onlyLocal bool, c *C) *S

func (self *Server) GetResponse(database, query, username, password string, onlyLocal bool, c *C) *http.Response {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.apiPort, database, username, password, encodedQuery)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.ApiPort(), database, username, password, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
Expand Down Expand Up @@ -318,7 +329,7 @@ func (self *Server) QueryWithUsername(database, query string, onlyLocal bool, c

func (self *Server) VerifyForbiddenQuery(database, query string, onlyLocal bool, c *C, username, password string) string {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.apiPort, database, username, password, encodedQuery)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.ApiPort(), database, username, password, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
Expand Down Expand Up @@ -357,7 +368,7 @@ func (self *Server) Get(url string, c *C) []byte {
}

func (self *Server) Request(method, url, data string, c *C) *http.Response {
fullUrl := fmt.Sprintf("http://localhost:%d%s", self.apiPort, url)
fullUrl := fmt.Sprintf("http://localhost:%d%s", self.ApiPort(), url)
req, err := http.NewRequest(method, fullUrl, bytes.NewBufferString(data))
c.Assert(err, IsNil)
resp, err := http.DefaultClient.Do(req)
Expand Down
33 changes: 33 additions & 0 deletions integration/single_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"

influxdb "github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/datastore"
. "github.com/influxdb/influxdb/integration/helpers"
. "launchpad.net/gocheck"
)
Expand Down Expand Up @@ -143,6 +146,36 @@ func (self *SingleServerSuite) TestListSeriesRegex(c *C) {
c.Assert(maps, HasLen, 3)
}

func (self *SingleServerSuite) TestDeleteExpiredShards(c *C) {
db := "delete_expired_shards"
client := self.server.GetClient(db, c)
c.Assert(client.CreateDatabase(db), IsNil)
err := client.CreateShardSpace(db, &influxdb.ShardSpace{
Name: "default",
Regex: ".*",
RetentionPolicy: "7d",
ShardDuration: "1y",
})
c.Assert(err, IsNil)

data := CreatePoints("test_using_deleted_shard", 1, 1000000)
data[0].Columns = append(data[0].Columns, "time")
for i := range data[0].Points {
data[0].Points[i] = append(data[0].Points[i], 0)
}
for i := 0; i < 2; i++ {
err = client.WriteSeriesWithTimePrecision(data, influxdb.Second)
c.Assert(err, IsNil)
}
// wait for the retention sweep to kick in
time.Sleep(self.server.RetentionSweepPeriod() + time.Second)
f, err := os.Open(path.Join(self.server.DataDir(), datastore.SHARD_DATABASE_DIR))
c.Assert(err, IsNil)
dirs, err := f.Readdirnames(0)
c.Assert(err, IsNil)
c.Assert(dirs, HasLen, 0)
}

func (self *SingleServerSuite) TestListSeriesWithSpace(c *C) {
db := "test_list_series_with_space"
client := self.server.GetClient("", c)
Expand Down

0 comments on commit 1f5f5cb

Please sign in to comment.