Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Expired shard's directory no longer getting deleted since 0.8.4? #1078

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions cluster/cluster_configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,10 @@ func (self *ClusterConfiguration) createDefaultShardSpace(database string) (*Sha
return space, nil
}

// Given a db and series name and pick a shard where a point with the
// given timestamp should be written to. If the point is outside the
// retention period of the shard space then return nil. The returned
// shard is always nil if err != nil
func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series string, microsecondsEpoch int64) (*ShardData, error) {
shardSpace := self.getShardSpaceToMatchSeriesName(db, series)
if shardSpace == nil {
Expand All @@ -804,6 +808,17 @@ func (self *ClusterConfiguration) GetShardToWriteToBySeriesAndTime(db, series st
return nil, err
}
}

// if the shard will be dropped anyway because of the shard space
// retention period, then return nothing. Don't try to write
retention := shardSpace.ParsedRetentionPeriod()
if retention != InfiniteRetention {
_, endTime := self.getStartAndEndBasedOnDuration(microsecondsEpoch, shardSpace.SecondsOfDuration())
if endTime.Before(time.Now().Add(-retention)) {
return nil, nil
}
}

matchingShards := make([]*ShardData, 0)
for _, s := range shardSpace.shards {
if s.IsMicrosecondInRange(microsecondsEpoch) {
Expand Down
4 changes: 3 additions & 1 deletion cluster/shard_space.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
DEFAULT_RETENTION_POLICY_DURATION = 0
)

const InfiniteRetention = time.Duration(0)

func NewShardSpace(database, name string) *ShardSpace {
s := &ShardSpace{
Database: database,
Expand Down Expand Up @@ -119,7 +121,7 @@ func (s *ShardSpace) ParsedRetentionPeriod() time.Duration {
if s.RetentionPolicy == "" {
return DEFAULT_RETENTION_POLICY_DURATION
} else if s.RetentionPolicy == "inf" {
return time.Duration(0)
return InfiniteRetention
}
d, _ := common.ParseTimeDuration(s.RetentionPolicy)
return time.Duration(d)
Expand Down
17 changes: 14 additions & 3 deletions coordinator/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -507,20 +507,31 @@ func (self *Coordinator) CommitSeriesData(db string, serieses []*protocol.Series
// TODO: this isn't needed anymore
series.SortPointsTimeDescending()

sn := series.GetName()
// use regular for loop since we update the iteration index `i' as
// we batch points that have the same timestamp
for i := 0; i < len(series.Points); {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a separate issue, I think we should refactor this loop to use a ranged loop and pull some things out of the loop.

if len(series.GetName()) == 0 {
return fmt.Errorf("Series name cannot be empty")
}

shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, series.GetName(), series.Points[i].GetTimestamp())
ts := series.Points[i].GetTimestamp()
shard, err := self.clusterConfiguration.GetShardToWriteToBySeriesAndTime(db, sn, ts)
if err != nil {
return err
}
log.Fine("GetShardToWriteToBySeriesAndTime(%s, %s, %d) = (%s, %v)", db, sn, ts, shard, err)
firstIndex := i
timestamp := series.Points[i].GetTimestamp()
for ; i < len(series.Points) && series.Points[i].GetTimestamp() == timestamp; i++ {
for ; i < len(series.Points) && series.Points[i].GetTimestamp() == ts; i++ {
// add all points with the same timestamp
}

// if shard == nil, then the points shouldn't be writte. This
// will happen if the points had timestamps earlier than the
// retention period
if shard == nil {
continue
}
newSeries := &protocol.Series{Name: series.Name, Fields: series.Fields, Points: series.Points[firstIndex:i:i]}

shardIdToShard[shard.Id()] = shard
Expand Down
6 changes: 5 additions & 1 deletion datastore/shard_datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,19 @@ func (self *ShardDatastore) incrementShardRefCountAndCloseOldestIfNeeded(id uint
func (self *ShardDatastore) ReturnShard(id uint32) {
self.shardsLock.Lock()
defer self.shardsLock.Unlock()
log.Fine("Returning shard. id = %d", id)
self.shardRefCounts[id] -= 1
if self.shardRefCounts[id] != 0 {
return
}

log.Fine("Checking if shard should be deleted. id = %d", id)
if _, ok := self.shardsToDelete[id]; ok {
self.deleteShard(id)
return
}

log.Fine("Checking if shard should be closed. id = %d", id)
if self.shardsToClose[id] {
self.closeShard(id)
}
Expand Down Expand Up @@ -251,7 +254,8 @@ func (self *ShardDatastore) DeleteShard(shardId uint32) {
// now. We have to wait until it's returned and delete
// it. ReturnShard will take care of that as soon as the reference
// count becomes 0.
if self.shardRefCounts[shardId] > 0 {
if refc := self.shardRefCounts[shardId]; refc > 0 {
log.Fine("Cannot delete shard: shardId = %d, shardRefCounts[shardId] = %d", shardId, refc)
self.shardsToDelete[shardId] = struct{}{}
return
}
Expand Down
33 changes: 22 additions & 11 deletions integration/helpers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,7 @@ type Server struct {
p *os.Process
configFile string
sslOnly bool
apiPort int
sslApiPort int
config *configuration.Configuration
args []string
}

Expand All @@ -42,7 +41,7 @@ func NewServer(configFile string, c *C) *Server {

func newServerCommon(configFile string, deleteData, ssl bool, c *C, args ...string) *Server {
config, _ := configuration.LoadConfiguration("../" + configFile)
s := &Server{configFile: configFile, apiPort: config.ApiHttpPort, sslApiPort: config.ApiHttpSslPort, sslOnly: ssl, args: args}
s := &Server{configFile: configFile, config: config, sslOnly: ssl, args: args}
if deleteData {
c.Assert(os.RemoveAll(config.DataDir), IsNil)
c.Assert(os.RemoveAll(config.WalDir), IsNil)
Expand All @@ -55,9 +54,9 @@ func newServerCommon(configFile string, deleteData, ssl bool, c *C, args ...stri
}

func (self *Server) WaitForServerToStart() {
url := fmt.Sprintf("http://localhost:%d/ping", self.apiPort)
url := fmt.Sprintf("http://localhost:%d/ping", self.ApiPort())
if self.sslOnly {
url = fmt.Sprintf("https://localhost:%d/ping", self.sslApiPort)
url = fmt.Sprintf("https://localhost:%d/ping", self.SslApiPort())
}

client := http.Client{
Expand Down Expand Up @@ -91,7 +90,7 @@ func (self *Server) WaitForServerToStart() {

func (self *Server) WaitForServerToSync() {
for i := 0; i < 600; i++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/sync?u=root&p=root", self.apiPort))
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/sync?u=root&p=root", self.ApiPort()))
if err != nil {
panic(err)
}
Expand All @@ -116,7 +115,7 @@ func (self *Server) GetClient(db string, c *C) *influxdb.Client {

func (self *Server) GetClientWithUser(db, username, password string, c *C) *influxdb.Client {
client, err := influxdb.NewClient(&influxdb.ClientConfig{
Host: fmt.Sprintf("localhost:%d", self.apiPort),
Host: fmt.Sprintf("localhost:%d", self.ApiPort()),
Username: username,
Password: password,
Database: db,
Expand Down Expand Up @@ -206,8 +205,20 @@ func (self *Server) Start() error {
return nil
}

func (self *Server) RetentionSweepPeriod() time.Duration {
return self.config.StorageRetentionSweepPeriod.Duration
}

func (self *Server) DataDir() string {
return self.config.DataDir
}

func (self *Server) ApiPort() int {
return self.apiPort
return self.config.ApiHttpPort
}

func (self *Server) SslApiPort() int {
return self.config.ApiHttpSslPort
}

func (self *Server) Stop() {
Expand Down Expand Up @@ -283,7 +294,7 @@ func (self *Server) QueryAsRoot(database, query string, onlyLocal bool, c *C) *S

func (self *Server) GetResponse(database, query, username, password string, onlyLocal bool, c *C) *http.Response {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.apiPort, database, username, password, encodedQuery)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.ApiPort(), database, username, password, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
Expand Down Expand Up @@ -318,7 +329,7 @@ func (self *Server) QueryWithUsername(database, query string, onlyLocal bool, c

func (self *Server) VerifyForbiddenQuery(database, query string, onlyLocal bool, c *C, username, password string) string {
encodedQuery := url.QueryEscape(query)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.apiPort, database, username, password, encodedQuery)
fullUrl := fmt.Sprintf("http://localhost:%d/db/%s/series?u=%s&p=%s&q=%s", self.ApiPort(), database, username, password, encodedQuery)
if onlyLocal {
fullUrl = fullUrl + "&force_local=true"
}
Expand Down Expand Up @@ -357,7 +368,7 @@ func (self *Server) Get(url string, c *C) []byte {
}

func (self *Server) Request(method, url, data string, c *C) *http.Response {
fullUrl := fmt.Sprintf("http://localhost:%d%s", self.apiPort, url)
fullUrl := fmt.Sprintf("http://localhost:%d%s", self.ApiPort(), url)
req, err := http.NewRequest(method, fullUrl, bytes.NewBufferString(data))
c.Assert(err, IsNil)
resp, err := http.DefaultClient.Do(req)
Expand Down
33 changes: 33 additions & 0 deletions integration/single_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ import (
"io/ioutil"
"net/http"
"os"
"path"
"strconv"
"strings"
"time"

influxdb "github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/datastore"
. "github.com/influxdb/influxdb/integration/helpers"
. "launchpad.net/gocheck"
)
Expand Down Expand Up @@ -143,6 +146,36 @@ func (self *SingleServerSuite) TestListSeriesRegex(c *C) {
c.Assert(maps, HasLen, 3)
}

func (self *SingleServerSuite) TestDeleteExpiredShards(c *C) {
db := "delete_expired_shards"
client := self.server.GetClient(db, c)
c.Assert(client.CreateDatabase(db), IsNil)
err := client.CreateShardSpace(db, &influxdb.ShardSpace{
Name: "default",
Regex: ".*",
RetentionPolicy: "7d",
ShardDuration: "1y",
})
c.Assert(err, IsNil)

data := CreatePoints("test_using_deleted_shard", 1, 1000000)
data[0].Columns = append(data[0].Columns, "time")
for i := range data[0].Points {
data[0].Points[i] = append(data[0].Points[i], 0)
}
for i := 0; i < 2; i++ {
err = client.WriteSeriesWithTimePrecision(data, influxdb.Second)
c.Assert(err, IsNil)
}
// wait for the retention sweep to kick in
time.Sleep(self.server.RetentionSweepPeriod() + time.Second)
f, err := os.Open(path.Join(self.server.DataDir(), datastore.SHARD_DATABASE_DIR))
c.Assert(err, IsNil)
dirs, err := f.Readdirnames(0)
c.Assert(err, IsNil)
c.Assert(dirs, HasLen, 0)
}

func (self *SingleServerSuite) TestListSeriesWithSpace(c *C) {
db := "test_list_series_with_space"
client := self.server.GetClient("", c)
Expand Down