Skip to content

Commit

Permalink
Support timezone offsets for queries
Browse files Browse the repository at this point in the history
The timezone for a query can now be added to the end with something like
`TZ("America/Los_Angeles")` and it will localize the results of the
query to be in that timezone. The offset will automatically be set to
the offset for that timezone and offsets will automatically adjust for
daylight savings time so grouping by a day will result in a 25 hour day
once a year and a 23 hour day another day of the year.

The automatic adjustment of intervals for timezone offsets changing will
only happen if the group by period is greater than the timezone offset
would be. That means grouping by an hour or less will not be affected by
daylight savings time, but a 2 hour or 1 day interval will be.

The default timezone is UTC and existing queries are unaffected by this
change.

When times are returned as strings (when `epoch=1` is not used), the
results will be returned using the requested timezone format in RFC3339
format.
  • Loading branch information
jsternberg committed Dec 27, 2016
1 parent 0a04499 commit f18d491
Show file tree
Hide file tree
Showing 14 changed files with 701 additions and 71 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## v1.3.0 [unreleased]

### Features

- [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries.

## v1.2.0 [unreleased]

### Release Notes
Expand Down
10 changes: 10 additions & 0 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,16 @@ func mustParseTime(layout, value string) time.Time {
return tm
}

func mustParseLocation(tzname string) *time.Location {
loc, err := time.LoadLocation(tzname)
if err != nil {
panic(err)
}
return loc
}

var LosAngeles = mustParseLocation("America/Los_Angeles")

// MustReadAll reads r. Panic on error.
func MustReadAll(r io.Reader) []byte {
b, err := ioutil.ReadAll(r)
Expand Down
95 changes: 95 additions & 0 deletions cmd/influxd/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5534,6 +5534,101 @@ func TestServer_Query_Fill(t *testing.T) {
}
}

func TestServer_Query_TimeZone(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
defer s.Close()

if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil {
t.Fatal(err)
}

var writes []string
for _, start := range []time.Time{
// One day before DST starts.
time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles),
// Middle of DST. No change.
time.Date(2000, 6, 1, 0, 0, 0, 0, LosAngeles),
// One day before DST ends.
time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles),
} {
ts := start
// Write every hour for 4 days.
for i := 0; i < 24*4; i++ {
writes = append(writes, fmt.Sprintf(`cpu,interval=daily value=0 %d`, ts.UnixNano()))
ts = ts.Add(time.Hour)
}

// Write every 5 minutes for 3 hours. Start at 1 on the day with DST.
ts = start.Add(25 * time.Hour)
for i := 0; i < 12*3; i++ {
writes = append(writes, fmt.Sprintf(`cpu,interval=hourly value=0 %d`, ts.UnixNano()))
ts = ts.Add(5 * time.Minute)
}
}

test := NewTest("db0", "rp0")
test.writes = Writes{
&Write{data: strings.Join(writes, "\n")},
}

test.addQueries([]*Query{
&Query{
name: "timezone offset - dst start - daily",
command: `SELECT count(value) FROM cpu WHERE time >= '2000-04-02T00:00:00-08:00' AND time < '2000-04-04T00:00:00-07:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-04-02T00:00:00-08:00",23],["2000-04-03T00:00:00-07:00",24]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "timezone offset - no change - daily",
command: `SELECT count(value) FROM cpu WHERE time >= '2000-06-01T00:00:00-07:00' AND time < '2000-06-03T00:00:00-07:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-06-01T00:00:00-07:00",24],["2000-06-02T00:00:00-07:00",24]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "timezone offset - dst end - daily",
command: `SELECT count(value) FROM cpu WHERE time >= '2000-10-29T00:00:00-07:00' AND time < '2000-10-31T00:00:00-08:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-10-29T00:00:00-07:00",25],["2000-10-30T00:00:00-08:00",24]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "timezone offset - dst start - hourly",
command: `SELECT count(value) FROM cpu WHERE time >= '2000-04-02T01:00:00-08:00' AND time < '2000-04-02T04:00:00-07:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-04-02T01:00:00-08:00",12],["2000-04-02T03:00:00-07:00",12]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "timezone offset - no change - hourly",
command: `SELECT count(value) FROM cpu WHERE time >= '2000-06-02T01:00:00-07:00' AND time < '2000-06-02T03:00:00-07:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-06-02T01:00:00-07:00",12],["2000-06-02T02:00:00-07:00",12]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
&Query{
name: "timezone offset - dst end - hourly",
command: `SELECT count(value) FROM cpu WHERE time >= '2000-10-29T01:00:00-07:00' AND time < '2000-10-29T02:00:00-08:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`,
exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-10-29T01:00:00-07:00",12],["2000-10-29T01:00:00-08:00",12]]}]}]}`,
params: url.Values{"db": []string{"db0"}},
},
}...)

for i, query := range test.queries {
if i == 0 {
if err := test.init(s); err != nil {
t.Fatalf("test init failed: %s", err)
}
}
if query.skip {
t.Logf("SKIP:: %s", query.name)
continue
}
if err := query.Execute(s); err != nil {
t.Error(query.Error(err))
} else if !query.success() {
t.Error(query.failureMessage())
}
}
}

func TestServer_Query_Chunk(t *testing.T) {
t.Parallel()
s := OpenServer(NewConfig())
Expand Down
3 changes: 3 additions & 0 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,9 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen
// Generate a row emitter from the iterator set.
em := influxql.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize)
em.Columns = stmt.ColumnNames()
if stmt.Location != nil {
em.Location = stmt.Location
}
em.OmitTime = stmt.OmitTime
defer em.Close()

Expand Down
6 changes: 6 additions & 0 deletions influxql/ast.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,6 +941,9 @@ type SelectStatement struct {
// The value to fill empty aggregate buckets with, if any
FillValue interface{}

// The timezone for the query, if any.
Location *time.Location

// Renames the implicit time field name.
TimeAlias string

Expand Down Expand Up @@ -1486,6 +1489,9 @@ func (s *SelectStatement) String() string {
if s.SOffset > 0 {
_, _ = fmt.Fprintf(&buf, " SOFFSET %d", s.SOffset)
}
if s.Location != nil {
_, _ = fmt.Fprintf(&buf, ` TZ("%s")`, s.Location)
}
return buf.String()
}

Expand Down
6 changes: 5 additions & 1 deletion influxql/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Emitter struct {
// The columns to attach to each row.
Columns []string

// The time zone location.
Location *time.Location

// Removes the "time" column from output.
// Used for meta queries where time does not apply.
OmitTime bool
Expand All @@ -32,6 +35,7 @@ func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter {
itrs: itrs,
ascending: ascending,
chunkSize: chunkSize,
Location: time.UTC,
}
}

Expand Down Expand Up @@ -156,7 +160,7 @@ func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} {

values := make([]interface{}, len(e.itrs)+offset)
if !e.OmitTime {
values[0] = time.Unix(0, t).UTC()
values[0] = time.Unix(0, t).In(e.Location)
}

for i, p := range e.buf {
Expand Down
97 changes: 53 additions & 44 deletions influxql/internal/internal.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions influxql/internal/internal.proto
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ message IteratorOptions {
optional string Condition = 8;
optional int64 StartTime = 9;
optional int64 EndTime = 10;
optional string Location = 18;
optional bool Ascending = 11;
optional int64 Limit = 12;
optional int64 Offset = 13;
Expand Down
Loading

0 comments on commit f18d491

Please sign in to comment.