Skip to content

Commit

Permalink
Truncate the shard group end time if it exceeds MaxNanoTime
Browse files Browse the repository at this point in the history
Related to #6599.
  • Loading branch information
jsternberg committed May 26, 2016
1 parent d666106 commit 907c88d
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
- [#6702](https://github.com/influxdata/influxdb/issues/6702): Fix SELECT statement required privileges.
- [#6701](https://github.com/influxdata/influxdb/issues/6701): Filter out sources that do not match the shard database/retention policy.
- [#6683](https://github.com/influxdata/influxdb/issues/6683): Fix compaction planning re-compacting large TSM files
- [#6693](https://github.com/influxdata/influxdb/pull/6693): Truncate the shard group end time if it exceeds MaxNanoTime.

## v0.13.0 [2016-05-12]

Expand Down
44 changes: 44 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6149,6 +6149,50 @@ func TestServer_Query_DuplicateMeasurements(t *testing.T) {
}
}

func TestServer_Query_LargeTimestamp(t *testing.T) {
t.Parallel()
s := OpenDefaultServer(NewConfig())
defer s.Close()

writes := []string{
fmt.Sprintf(`cpu value=100 %d`, models.MaxNanoTime.UnixNano()),
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}
test.addQueries([]*Query{
&Query{
name: `select value at max nano time`,
params: url.Values{"db": []string{"db0"}},
command: fmt.Sprintf(`SELECT value FROM cpu WHERE time <= %d`, models.MaxNanoTime.UnixNano()),
exp: `{"results":[{"series":[{"name":"cpu","columns":["time","value"],"values":[["` + models.MaxNanoTime.Format(time.RFC3339Nano) + `",100]]}]}]}`,
},
}...)

if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}

// Open a new server with the same configuration file.
// This is to ensure the meta data was marshaled correctly.
s2 := OpenServer(s.Config)
defer s2.Close()

for _, query := range test.queries {
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

// This test reproduced a data race with closing the
// Subscriber points channel while writes were in-flight in the PointsWriter.
func TestServer_ConcurrentPointsWriter_Subscriber(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions services/meta/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/gogo/protobuf/proto"
"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/influxdb/models"
internal "github.com/influxdata/influxdb/services/meta/internal"
)

Expand Down Expand Up @@ -371,6 +372,9 @@ func (data *Data) CreateShardGroup(database, policy string, timestamp time.Time)
sgi.ID = data.MaxShardGroupID
sgi.StartTime = timestamp.Truncate(rpi.ShardGroupDuration).UTC()
sgi.EndTime = sgi.StartTime.Add(rpi.ShardGroupDuration).UTC()
if sgi.EndTime.After(models.MaxNanoTime) {
sgi.EndTime = models.MaxNanoTime
}

data.MaxShardID++
sgi.Shards = []ShardInfo{
Expand Down

0 comments on commit 907c88d

Please sign in to comment.