From f18d4912600c2bfd33113853eaa906f919bccf3b Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 23 Dec 2016 17:14:53 -0600 Subject: [PATCH] Support timezone offsets for queries The timezone for a query can now be added to the end with something like `TZ("America/Los_Angeles")` and it will localize the results of the query to be in that timezone. The offset will automatically be set to the offset for that timezone and offsets will automatically adjust for daylight savings time so grouping by a day will result in a 25 hour day once a year and a 23 hour day another day of the year. The automatic adjustment of intervals for timezone offsets changing will only happen if the group by period is greater than the timezone offset would be. That means grouping by an hour or less will not be affected by daylight savings time, but a 2 hour or 1 day interval will be. The default timezone is UTC and existing queries are unaffected by this change. When times are returned as strings (when `epoch=1` is not used), the results will be returned using the requested timezone format in RFC3339 format. --- CHANGELOG.md | 6 + cmd/influxd/run/server_helpers_test.go | 10 + cmd/influxd/run/server_test.go | 95 ++++++++++ coordinator/statement_executor.go | 3 + influxql/ast.go | 6 + influxql/emitter.go | 6 +- influxql/internal/internal.pb.go | 97 +++++----- influxql/internal/internal.proto | 1 + influxql/iterator.gen.go | 116 ++++++++++-- influxql/iterator.gen.go.tmpl | 29 ++- influxql/iterator.go | 73 ++++++- influxql/iterator_test.go | 253 +++++++++++++++++++++++++ influxql/parser.go | 38 ++++ influxql/parser_test.go | 39 ++++ 14 files changed, 701 insertions(+), 71 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0d59e501d1..f96d36c60c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## v1.3.0 [unreleased] + +### Features + +- [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries. + ## v1.2.0 [unreleased] ### Release Notes diff --git a/cmd/influxd/run/server_helpers_test.go b/cmd/influxd/run/server_helpers_test.go index bd95501e3f4..d5830667035 100644 --- a/cmd/influxd/run/server_helpers_test.go +++ b/cmd/influxd/run/server_helpers_test.go @@ -276,6 +276,16 @@ func mustParseTime(layout, value string) time.Time { return tm } +func mustParseLocation(tzname string) *time.Location { + loc, err := time.LoadLocation(tzname) + if err != nil { + panic(err) + } + return loc +} + +var LosAngeles = mustParseLocation("America/Los_Angeles") + // MustReadAll reads r. Panic on error. func MustReadAll(r io.Reader) []byte { b, err := ioutil.ReadAll(r) diff --git a/cmd/influxd/run/server_test.go b/cmd/influxd/run/server_test.go index a3fa82c044a..f3f1728cf99 100644 --- a/cmd/influxd/run/server_test.go +++ b/cmd/influxd/run/server_test.go @@ -5534,6 +5534,101 @@ func TestServer_Query_Fill(t *testing.T) { } } +func TestServer_Query_TimeZone(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil { + t.Fatal(err) + } + + var writes []string + for _, start := range []time.Time{ + // One day before DST starts. + time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles), + // Middle of DST. No change. + time.Date(2000, 6, 1, 0, 0, 0, 0, LosAngeles), + // One day before DST ends. + time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles), + } { + ts := start + // Write every hour for 4 days. + for i := 0; i < 24*4; i++ { + writes = append(writes, fmt.Sprintf(`cpu,interval=daily value=0 %d`, ts.UnixNano())) + ts = ts.Add(time.Hour) + } + + // Write every 5 minutes for 3 hours. Start at 1 on the day with DST. + ts = start.Add(25 * time.Hour) + for i := 0; i < 12*3; i++ { + writes = append(writes, fmt.Sprintf(`cpu,interval=hourly value=0 %d`, ts.UnixNano())) + ts = ts.Add(5 * time.Minute) + } + } + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: strings.Join(writes, "\n")}, + } + + test.addQueries([]*Query{ + &Query{ + name: "timezone offset - dst start - daily", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-04-02T00:00:00-08:00' AND time < '2000-04-04T00:00:00-07:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-04-02T00:00:00-08:00",23],["2000-04-03T00:00:00-07:00",24]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - no change - daily", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-06-01T00:00:00-07:00' AND time < '2000-06-03T00:00:00-07:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-06-01T00:00:00-07:00",24],["2000-06-02T00:00:00-07:00",24]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - dst end - daily", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-10-29T00:00:00-07:00' AND time < '2000-10-31T00:00:00-08:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-10-29T00:00:00-07:00",25],["2000-10-30T00:00:00-08:00",24]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - dst start - hourly", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-04-02T01:00:00-08:00' AND time < '2000-04-02T04:00:00-07:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-04-02T01:00:00-08:00",12],["2000-04-02T03:00:00-07:00",12]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - no change - hourly", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-06-02T01:00:00-07:00' AND time < '2000-06-02T03:00:00-07:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-06-02T01:00:00-07:00",12],["2000-06-02T02:00:00-07:00",12]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - dst end - hourly", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-10-29T01:00:00-07:00' AND time < '2000-10-29T02:00:00-08:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-10-29T01:00:00-07:00",12],["2000-10-29T01:00:00-08:00",12]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + func TestServer_Query_Chunk(t *testing.T) { t.Parallel() s := OpenServer(NewConfig()) diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index fc211295c00..05e9855f649 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -399,6 +399,9 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen // Generate a row emitter from the iterator set. em := influxql.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize) em.Columns = stmt.ColumnNames() + if stmt.Location != nil { + em.Location = stmt.Location + } em.OmitTime = stmt.OmitTime defer em.Close() diff --git a/influxql/ast.go b/influxql/ast.go index d7f3573f2cf..716a4e72a2e 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -941,6 +941,9 @@ type SelectStatement struct { // The value to fill empty aggregate buckets with, if any FillValue interface{} + // The timezone for the query, if any. + Location *time.Location + // Renames the implicit time field name. TimeAlias string @@ -1486,6 +1489,9 @@ func (s *SelectStatement) String() string { if s.SOffset > 0 { _, _ = fmt.Fprintf(&buf, " SOFFSET %d", s.SOffset) } + if s.Location != nil { + _, _ = fmt.Fprintf(&buf, ` TZ("%s")`, s.Location) + } return buf.String() } diff --git a/influxql/emitter.go b/influxql/emitter.go index 0f55e2421ad..18a3ce79454 100644 --- a/influxql/emitter.go +++ b/influxql/emitter.go @@ -20,6 +20,9 @@ type Emitter struct { // The columns to attach to each row. Columns []string + // The time zone location. + Location *time.Location + // Removes the "time" column from output. // Used for meta queries where time does not apply. OmitTime bool @@ -32,6 +35,7 @@ func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter { itrs: itrs, ascending: ascending, chunkSize: chunkSize, + Location: time.UTC, } } @@ -156,7 +160,7 @@ func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} { values := make([]interface{}, len(e.itrs)+offset) if !e.OmitTime { - values[0] = time.Unix(0, t).UTC() + values[0] = time.Unix(0, t).In(e.Location) } for i, p := range e.buf { diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index cfafbf43951..fab87f666ff 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -193,6 +193,7 @@ type IteratorOptions struct { Condition *string `protobuf:"bytes,8,opt,name=Condition" json:"Condition,omitempty"` StartTime *int64 `protobuf:"varint,9,opt,name=StartTime" json:"StartTime,omitempty"` EndTime *int64 `protobuf:"varint,10,opt,name=EndTime" json:"EndTime,omitempty"` + Location *string `protobuf:"bytes,18,opt,name=Location" json:"Location,omitempty"` Ascending *bool `protobuf:"varint,11,opt,name=Ascending" json:"Ascending,omitempty"` Limit *int64 `protobuf:"varint,12,opt,name=Limit" json:"Limit,omitempty"` Offset *int64 `protobuf:"varint,13,opt,name=Offset" json:"Offset,omitempty"` @@ -284,6 +285,13 @@ func (m *IteratorOptions) GetEndTime() int64 { return 0 } +func (m *IteratorOptions) GetLocation() string { + if m != nil && m.Location != nil { + return *m.Location + } + return "" +} + func (m *IteratorOptions) GetAscending() bool { if m != nil && m.Ascending != nil { return *m.Ascending @@ -481,48 +489,49 @@ func init() { func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) } var fileDescriptorInternal = []byte{ - // 685 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0xd1, 0x6a, 0xdc, 0x3a, - 0x10, 0xc5, 0xf6, 0x7a, 0x63, 0x6b, 0xb3, 0x37, 0xb9, 0x22, 0xf7, 0x46, 0x94, 0xd2, 0x1a, 0x3f, - 0x19, 0x4a, 0x37, 0x90, 0xd7, 0x42, 0x61, 0xdb, 0x24, 0xb0, 0xd0, 0x6e, 0x82, 0x1c, 0xf2, 0xae, - 0x66, 0x67, 0x8d, 0xc0, 0x2b, 0x6f, 0x65, 0xb9, 0x6c, 0xde, 0xfa, 0x1b, 0xfd, 0x86, 0x7e, 0x4c, - 0x7f, 0xa9, 0x68, 0x64, 0xaf, 0x9d, 0x14, 0x9a, 0x27, 0xcf, 0x39, 0x33, 0x92, 0x7c, 0x66, 0x8e, - 0x44, 0x4e, 0xa5, 0x32, 0xa0, 0x95, 0x28, 0xcf, 0xba, 0x60, 0xb6, 0xd5, 0x95, 0xa9, 0x68, 0x24, - 0xd5, 0xba, 0x6c, 0x76, 0x5f, 0xcb, 0xf4, 0x97, 0x4f, 0xc2, 0x9b, 0x4a, 0x2a, 0x43, 0x29, 0x19, - 0x2d, 0xc5, 0x06, 0x98, 0x97, 0xf8, 0x59, 0xcc, 0x31, 0xb6, 0xdc, 0xad, 0x28, 0x6a, 0xe6, 0x3b, - 0xce, 0xc6, 0xc8, 0xc9, 0x0d, 0xb0, 0x20, 0xf1, 0xb3, 0x80, 0x63, 0x4c, 0x8f, 0x49, 0xb0, 0x94, - 0x25, 0x1b, 0x25, 0x7e, 0x16, 0x71, 0x1b, 0xd2, 0xd7, 0x24, 0x98, 0x37, 0x3b, 0x16, 0x26, 0x41, - 0x36, 0x39, 0x9f, 0xce, 0xba, 0xf3, 0x66, 0xf3, 0x66, 0xc7, 0x6d, 0x86, 0xbe, 0x22, 0x64, 0x5e, - 0x14, 0x1a, 0x0a, 0x61, 0x60, 0xc5, 0xc6, 0x89, 0x97, 0x4d, 0xf9, 0x80, 0xb1, 0xf9, 0xab, 0xb2, - 0x12, 0xe6, 0x4e, 0x94, 0x0d, 0xb0, 0x83, 0xc4, 0xcb, 0x3c, 0x3e, 0x60, 0x68, 0x4a, 0x0e, 0x17, - 0xca, 0x40, 0x01, 0xda, 0x55, 0x44, 0x89, 0x97, 0x05, 0xfc, 0x11, 0x47, 0x13, 0x32, 0xc9, 0x8d, - 0x96, 0xaa, 0x70, 0x25, 0x71, 0xe2, 0x65, 0x31, 0x1f, 0x52, 0x76, 0x97, 0x0f, 0x55, 0x55, 0x82, - 0x50, 0xae, 0x84, 0x24, 0x5e, 0x16, 0xf1, 0x47, 0x1c, 0x7d, 0x4b, 0xc2, 0xdc, 0x08, 0x53, 0xb3, - 0x49, 0xe2, 0x65, 0x93, 0xf3, 0xd3, 0x5e, 0xcc, 0xc2, 0x80, 0x16, 0xa6, 0xd2, 0x98, 0xe6, 0xae, - 0x2a, 0xfd, 0xe9, 0xa1, 0x74, 0xfa, 0x82, 0x44, 0x17, 0xc2, 0x88, 0xdb, 0x87, 0xad, 0xeb, 0x69, - 0xc8, 0xf7, 0xf8, 0x89, 0x38, 0xff, 0x59, 0x71, 0xc1, 0xf3, 0xe2, 0x46, 0xcf, 0x8b, 0x0b, 0xff, - 0x14, 0x97, 0x7e, 0x1f, 0x91, 0xa3, 0x4e, 0xc6, 0xf5, 0xd6, 0xc8, 0x4a, 0xe1, 0x84, 0x2f, 0x77, - 0x5b, 0xcd, 0x3c, 0xdc, 0x12, 0x63, 0x3b, 0x61, 0x3b, 0x4f, 0x3f, 0x09, 0xb2, 0xd8, 0x0d, 0x30, - 0x23, 0xe3, 0x2b, 0x09, 0xe5, 0xaa, 0x66, 0xff, 0xe2, 0x90, 0x8f, 0xfb, 0xbe, 0xdc, 0x09, 0xcd, - 0x61, 0xcd, 0xdb, 0x3c, 0x3d, 0x23, 0x07, 0x79, 0xd5, 0xe8, 0x7b, 0xa8, 0x59, 0x80, 0xa5, 0xff, - 0xf5, 0xa5, 0x9f, 0x41, 0xd4, 0x8d, 0x86, 0x0d, 0x28, 0xc3, 0xbb, 0x2a, 0x3a, 0x23, 0x91, 0x95, - 0xaa, 0xbf, 0x89, 0x12, 0x75, 0x4d, 0xce, 0xe9, 0xa0, 0xe9, 0x6d, 0x86, 0xef, 0x6b, 0x6c, 0x3b, - 0x2f, 0xe4, 0x06, 0x54, 0x6d, 0x7f, 0x1f, 0x3d, 0x17, 0xf3, 0x01, 0x63, 0x05, 0x5d, 0xc9, 0xb2, - 0x44, 0x97, 0x85, 0x1c, 0x63, 0xfa, 0x92, 0xc4, 0xf6, 0x3b, 0xb4, 0x57, 0x4f, 0xd8, 0xec, 0xc7, - 0x4a, 0xad, 0xa4, 0x6d, 0x08, 0x5a, 0x2b, 0xe6, 0x3d, 0x61, 0xb3, 0xb9, 0x11, 0xda, 0xe0, 0x3d, - 0x88, 0x71, 0x36, 0x3d, 0x41, 0x19, 0x39, 0xb8, 0x54, 0x2b, 0xcc, 0x11, 0xcc, 0x75, 0xd0, 0xae, - 0x9b, 0xd7, 0xf7, 0xa0, 0x56, 0x52, 0x15, 0xe8, 0xa6, 0x88, 0xf7, 0x04, 0x3d, 0x21, 0xe1, 0x27, - 0xb9, 0x91, 0x86, 0x1d, 0xe2, 0x2a, 0x07, 0xe8, 0xff, 0x64, 0x7c, 0xbd, 0x5e, 0xd7, 0x60, 0xd8, - 0x14, 0xe9, 0x16, 0x59, 0x3e, 0x77, 0xe5, 0xff, 0x38, 0xde, 0x21, 0x7b, 0x7a, 0xde, 0x2e, 0x38, - 0x72, 0xa7, 0xe7, 0xfd, 0x8a, 0x0b, 0x58, 0x35, 0x5b, 0x60, 0xc7, 0x78, 0x74, 0x8b, 0xd2, 0x77, - 0xe4, 0x70, 0x30, 0x85, 0x9a, 0xbe, 0x21, 0xe1, 0xc2, 0xc0, 0xa6, 0x66, 0xde, 0xdf, 0x86, 0xe5, - 0x6a, 0xd2, 0x1f, 0x1e, 0x99, 0x0c, 0xe8, 0xce, 0xf5, 0x5f, 0x44, 0x0d, 0xad, 0x7f, 0xf6, 0x98, - 0x66, 0xe4, 0x88, 0x83, 0x01, 0x65, 0x7b, 0x78, 0x53, 0x95, 0xf2, 0xfe, 0x01, 0xad, 0x1f, 0xf3, - 0xa7, 0xf4, 0xfe, 0x2d, 0x0a, 0x9c, 0x03, 0xf1, 0x2d, 0x3a, 0x21, 0x21, 0x87, 0x02, 0x76, 0xad, - 0xd3, 0x1d, 0xb0, 0xe7, 0x2d, 0xea, 0x5b, 0xa1, 0x0b, 0x30, 0xad, 0xbf, 0xf7, 0x38, 0x7d, 0xdf, - 0xdb, 0x08, 0xff, 0xab, 0xd1, 0x02, 0xe7, 0xe9, 0x61, 0x5f, 0xf6, 0x78, 0xd0, 0x62, 0x7f, 0xd8, - 0xe2, 0x74, 0x4e, 0xa6, 0x8f, 0x6e, 0x38, 0xf6, 0x16, 0xb4, 0x84, 0x7a, 0xd9, 0xee, 0xd1, 0x41, - 0xbb, 0x05, 0xbe, 0xa2, 0xcb, 0x6e, 0x0b, 0x87, 0xd2, 0x19, 0x19, 0xbb, 0xcb, 0x60, 0x2f, 0xd0, - 0x9d, 0x28, 0xdb, 0xd7, 0xd5, 0x86, 0xf8, 0x90, 0xda, 0xc7, 0xc1, 0x77, 0xae, 0xb4, 0xf1, 0xef, - 0x00, 0x00, 0x00, 0xff, 0xff, 0x8c, 0x01, 0x5d, 0x11, 0xb2, 0x05, 0x00, 0x00, + // 695 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4c, + 0x10, 0x45, 0x92, 0xe5, 0x58, 0xeb, 0xf8, 0x4b, 0xbe, 0x25, 0x6d, 0x96, 0x52, 0x5a, 0xa1, 0x2b, + 0x41, 0xa9, 0x03, 0xb9, 0x2d, 0x14, 0xdc, 0x26, 0x01, 0x43, 0xea, 0x84, 0x75, 0xc8, 0xfd, 0xd6, + 0x1e, 0x8b, 0x05, 0x79, 0xe5, 0xae, 0x56, 0xc5, 0x79, 0x94, 0x42, 0xdf, 0xa0, 0x0f, 0xd3, 0x57, + 0x2a, 0x3b, 0xab, 0xbf, 0xa4, 0xd0, 0x5c, 0x79, 0xce, 0x99, 0xf1, 0x68, 0x67, 0xe6, 0xcc, 0x90, + 0x53, 0xa9, 0x0c, 0x68, 0x25, 0xf2, 0xb3, 0xc6, 0x98, 0xee, 0x74, 0x61, 0x0a, 0x3a, 0x92, 0x6a, + 0x93, 0x57, 0xfb, 0x6f, 0x79, 0xf2, 0xdb, 0x27, 0xe1, 0x6d, 0x21, 0x95, 0xa1, 0x94, 0x0c, 0x16, + 0x62, 0x0b, 0xcc, 0x8b, 0xfd, 0x34, 0xe2, 0x68, 0x5b, 0xee, 0x4e, 0x64, 0x25, 0xf3, 0x1d, 0x67, + 0x6d, 0xe4, 0xe4, 0x16, 0x58, 0x10, 0xfb, 0x69, 0xc0, 0xd1, 0xa6, 0xc7, 0x24, 0x58, 0xc8, 0x9c, + 0x0d, 0x62, 0x3f, 0x1d, 0x71, 0x6b, 0xd2, 0xb7, 0x24, 0x98, 0x55, 0x7b, 0x16, 0xc6, 0x41, 0x3a, + 0x3e, 0x9f, 0x4c, 0x9b, 0xef, 0x4d, 0x67, 0xd5, 0x9e, 0x5b, 0x0f, 0x7d, 0x43, 0xc8, 0x2c, 0xcb, + 0x34, 0x64, 0xc2, 0xc0, 0x9a, 0x0d, 0x63, 0x2f, 0x9d, 0xf0, 0x1e, 0x63, 0xfd, 0x57, 0x79, 0x21, + 0xcc, 0xbd, 0xc8, 0x2b, 0x60, 0x07, 0xb1, 0x97, 0x7a, 0xbc, 0xc7, 0xd0, 0x84, 0x1c, 0xce, 0x95, + 0x81, 0x0c, 0xb4, 0x8b, 0x18, 0xc5, 0x5e, 0x1a, 0xf0, 0x47, 0x1c, 0x8d, 0xc9, 0x78, 0x69, 0xb4, + 0x54, 0x99, 0x0b, 0x89, 0x62, 0x2f, 0x8d, 0x78, 0x9f, 0xb2, 0x59, 0x3e, 0x15, 0x45, 0x0e, 0x42, + 0xb9, 0x10, 0x12, 0x7b, 0xe9, 0x88, 0x3f, 0xe2, 0xe8, 0x7b, 0x12, 0x2e, 0x8d, 0x30, 0x25, 0x1b, + 0xc7, 0x5e, 0x3a, 0x3e, 0x3f, 0xed, 0x8a, 0x99, 0x1b, 0xd0, 0xc2, 0x14, 0x1a, 0xdd, 0xdc, 0x45, + 0x25, 0xbf, 0x3c, 0x2c, 0x9d, 0xbe, 0x22, 0xa3, 0x0b, 0x61, 0xc4, 0xdd, 0xc3, 0xce, 0xf5, 0x34, + 0xe4, 0x2d, 0x7e, 0x52, 0x9c, 0xff, 0x6c, 0x71, 0xc1, 0xf3, 0xc5, 0x0d, 0x9e, 0x2f, 0x2e, 0xfc, + 0xbb, 0xb8, 0xe4, 0xe7, 0x80, 0x1c, 0x35, 0x65, 0xdc, 0xec, 0x8c, 0x2c, 0x14, 0x4e, 0xf8, 0x72, + 0xbf, 0xd3, 0xcc, 0xc3, 0x94, 0x68, 0xdb, 0x09, 0xdb, 0x79, 0xfa, 0x71, 0x90, 0x46, 0x6e, 0x80, + 0x29, 0x19, 0x5e, 0x49, 0xc8, 0xd7, 0x25, 0xfb, 0x1f, 0x87, 0x7c, 0xdc, 0xf5, 0xe5, 0x5e, 0x68, + 0x0e, 0x1b, 0x5e, 0xfb, 0xe9, 0x19, 0x39, 0x58, 0x16, 0x95, 0x5e, 0x41, 0xc9, 0x02, 0x0c, 0x7d, + 0xd1, 0x85, 0x7e, 0x01, 0x51, 0x56, 0x1a, 0xb6, 0xa0, 0x0c, 0x6f, 0xa2, 0xe8, 0x94, 0x8c, 0x6c, + 0xa9, 0xfa, 0xbb, 0xc8, 0xb1, 0xae, 0xf1, 0x39, 0xed, 0x35, 0xbd, 0xf6, 0xf0, 0x36, 0xc6, 0xb6, + 0xf3, 0x42, 0x6e, 0x41, 0x95, 0xf6, 0xf9, 0xa8, 0xb9, 0x88, 0xf7, 0x18, 0x5b, 0xd0, 0x95, 0xcc, + 0x73, 0x54, 0x59, 0xc8, 0xd1, 0xa6, 0xaf, 0x49, 0x64, 0x7f, 0xfb, 0xf2, 0xea, 0x08, 0xeb, 0xfd, + 0x5c, 0xa8, 0xb5, 0xb4, 0x0d, 0x41, 0x69, 0x45, 0xbc, 0x23, 0xac, 0x77, 0x69, 0x84, 0x36, 0xb8, + 0x07, 0x11, 0xce, 0xa6, 0x23, 0x28, 0x23, 0x07, 0x97, 0x6a, 0x8d, 0x3e, 0x82, 0xbe, 0x06, 0x5a, + 0x49, 0x5c, 0x17, 0x2b, 0x81, 0x49, 0x29, 0x26, 0x6d, 0xb1, 0xcd, 0x39, 0x2b, 0x57, 0xa0, 0xd6, + 0x52, 0x65, 0xa8, 0xb4, 0x11, 0xef, 0x08, 0x7a, 0x42, 0xc2, 0x6b, 0xb9, 0x95, 0x86, 0x1d, 0x62, + 0x46, 0x07, 0xe8, 0x4b, 0x32, 0xbc, 0xd9, 0x6c, 0x4a, 0x30, 0x6c, 0x82, 0x74, 0x8d, 0x2c, 0xbf, + 0x74, 0xe1, 0xff, 0x39, 0xde, 0x21, 0xfb, 0xb2, 0x65, 0xfd, 0x87, 0x23, 0xf7, 0xb2, 0x65, 0xf7, + 0x8f, 0x0b, 0x58, 0x57, 0x3b, 0x60, 0xc7, 0xf8, 0xe9, 0x1a, 0x25, 0x1f, 0xc8, 0x61, 0x6f, 0x42, + 0x25, 0x7d, 0x47, 0xc2, 0xb9, 0x81, 0x6d, 0xc9, 0xbc, 0x7f, 0x0d, 0xd2, 0xc5, 0x24, 0x3f, 0x3c, + 0x32, 0xee, 0xd1, 0xcd, 0x46, 0x7c, 0x15, 0x25, 0xd4, 0xda, 0x6a, 0x31, 0x4d, 0xc9, 0x11, 0x07, + 0x03, 0xca, 0xf6, 0xe2, 0xb6, 0xc8, 0xe5, 0xea, 0x01, 0xd7, 0x22, 0xe2, 0x4f, 0xe9, 0xf6, 0x4e, + 0x05, 0x4e, 0x9d, 0x78, 0xa7, 0x4e, 0x48, 0xc8, 0x21, 0x83, 0x7d, 0xbd, 0x05, 0x0e, 0xd8, 0xef, + 0xcd, 0xcb, 0x3b, 0xa1, 0x33, 0x30, 0xb5, 0xf6, 0x5b, 0x9c, 0x7c, 0xec, 0x24, 0x86, 0xef, 0xaa, + 0xb4, 0x1b, 0x8b, 0x87, 0x7d, 0x69, 0x71, 0xaf, 0xc5, 0x7e, 0xbf, 0xc5, 0xc9, 0x8c, 0x4c, 0x1e, + 0x6d, 0x3f, 0xf6, 0x16, 0xb4, 0x84, 0x72, 0x51, 0xe7, 0x68, 0xa0, 0x4d, 0x81, 0x17, 0x76, 0xd1, + 0xa4, 0x70, 0x28, 0x99, 0x92, 0xa1, 0x5b, 0x14, 0xbb, 0x5c, 0xf7, 0x22, 0xaf, 0x2f, 0xaf, 0x35, + 0xf1, 0xc8, 0xda, 0xc3, 0xe1, 0x3b, 0xc5, 0x5a, 0xfb, 0x4f, 0x00, 0x00, 0x00, 0xff, 0xff, 0xc6, + 0x77, 0x75, 0x3f, 0xce, 0x05, 0x00, 0x00, } diff --git a/influxql/internal/internal.proto b/influxql/internal/internal.proto index 5f15c9358fc..94d4c30507f 100644 --- a/influxql/internal/internal.proto +++ b/influxql/internal/internal.proto @@ -36,6 +36,7 @@ message IteratorOptions { optional string Condition = 8; optional int64 StartTime = 9; optional int64 EndTime = 10; + optional string Location = 18; optional bool Ascending = 11; optional int64 Limit = 12; optional int64 Offset = 13; diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index f76a80e1745..f4b7b23acb0 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -571,9 +571,10 @@ type floatFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -620,6 +621,9 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -655,6 +659,9 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = FloatPoint{Nil: true} break } @@ -710,9 +717,21 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } @@ -2933,9 +2952,10 @@ type integerFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -2982,6 +3002,9 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -3017,6 +3040,9 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = IntegerPoint{Nil: true} break } @@ -3072,9 +3098,21 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } @@ -5292,9 +5330,10 @@ type stringFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -5341,6 +5380,9 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -5376,6 +5418,9 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = StringPoint{Nil: true} break } @@ -5416,9 +5461,21 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } @@ -7636,9 +7693,10 @@ type booleanFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -7685,6 +7743,9 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -7720,6 +7781,9 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = BooleanPoint{Nil: true} break } @@ -7760,9 +7824,21 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index 09dcbb58317..a73a289d868 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -569,9 +569,10 @@ type {{$k.name}}FillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -618,6 +619,9 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -653,6 +657,9 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = {{$k.Name}}Point{Nil: true} break } @@ -711,9 +718,21 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } diff --git a/influxql/iterator.go b/influxql/iterator.go index 3f1cea8fded..a8a068fedb3 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -27,6 +27,9 @@ const ( // MaxTime is used as the maximum time value when computing an unbounded range. // This time is 2262-04-11 23:47:16.854775806 +0000 UTC MaxTime = models.MaxNanoTime + + // secToNs is the number of nanoseconds in a second. + secToNs = int64(time.Second) ) // Iterator represents a generic interface for all Iterators. @@ -714,6 +717,7 @@ type IteratorOptions struct { // Group by interval and tags. Interval Interval Dimensions []string + Location *time.Location // Fill options. Fill FillOption @@ -769,6 +773,7 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite opt.EndTime = MaxTime } } + opt.Location = stmt.Location // Determine group by interval. interval, err := stmt.GroupByInterval() @@ -842,6 +847,13 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) { // Subtract the offset to the time so we calculate the correct base interval. t -= int64(opt.Interval.Offset) + // Retrieve the zone offset for the start time. + var startOffset int64 + if opt.Location != nil { + _, startOffset = opt.Zone(t) + t += startOffset + } + // Truncate time by duration. dt := t % int64(opt.Interval.Duration) if dt < 0 { @@ -851,9 +863,37 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) { } t -= dt + // Look for the start offset again because the first time may have been + // after the offset switch. Now that we are at midnight in UTC, we can + // lookup the zone offset again to get the real starting offset. + if opt.Location != nil { + _, adjustedOffset := opt.Zone(t) + // Do not adjust the offset if the offset change is greater than or + // equal to the duration. + if o := startOffset - adjustedOffset; o != 0 && abs(o) < int64(opt.Interval.Duration) { + startOffset = adjustedOffset + } + } + // Apply the offset. - start = t + int64(opt.Interval.Offset) + start = t + int64(opt.Interval.Offset) - startOffset end = start + int64(opt.Interval.Duration) + + // Retrieve the zone offset for the end time. + if opt.Location != nil { + _, endOffset := opt.Zone(end) + // Adjust the end time if the offset is different from the start offset. + if startOffset != endOffset { + offset := startOffset - endOffset + + // Only apply the offset if it is smaller than the duration. + // This prevents going back in time and creating time windows + // that don't make any sense. + if abs(offset) < int64(opt.Interval.Duration) { + end += offset + } + } + } return } @@ -882,6 +922,17 @@ func (opt IteratorOptions) ElapsedInterval() Interval { return Interval{Duration: time.Nanosecond} } +// Zone returns the zone information for the given time. The offset is in nanoseconds. +func (opt *IteratorOptions) Zone(ns int64) (string, int64) { + if opt.Location == nil { + return "", 0 + } + + t := time.Unix(0, ns).In(opt.Location) + name, offset := t.Zone() + return name, secToNs * int64(offset) +} + // MarshalBinary encodes opt into a binary format. func (opt *IteratorOptions) MarshalBinary() ([]byte, error) { return proto.Marshal(encodeIteratorOptions(opt)) @@ -923,6 +974,11 @@ func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions { pb.Expr = proto.String(opt.Expr.String()) } + // Set the location, if set. + if opt.Location != nil { + pb.Location = proto.String(opt.Location.String()) + } + // Convert and encode aux fields as variable references. pb.Fields = make([]*internal.VarRef, len(opt.Aux)) pb.Aux = make([]string, len(opt.Aux)) @@ -977,6 +1033,14 @@ func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, erro opt.Expr = expr } + if pb.Location != nil { + loc, err := time.LoadLocation(pb.GetLocation()) + if err != nil { + return nil, err + } + opt.Location = loc + } + // Convert and decode variable references. if fields := pb.GetFields(); fields != nil { opt.Aux = make([]VarRef, len(fields)) @@ -1221,3 +1285,10 @@ type reverseStringSlice []string func (p reverseStringSlice) Len() int { return len(p) } func (p reverseStringSlice) Less(i, j int) bool { return p[i] > p[j] } func (p reverseStringSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func abs(v int64) int64 { + if v < 0 { + return -v + } + return v +} diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 910efbc0635..3466ee23b5d 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -697,6 +697,238 @@ func TestLimitIterator(t *testing.T) { } } +func TestFillIterator_DST_Start_GroupByDay_Ascending(t *testing.T) { + start := time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 4, 5, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(47 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(71 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_Start_GroupByDay_Descending(t *testing.T) { + start := time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 4, 5, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(71 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(47 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_Start_GroupByHour_Ascending(t *testing.T) { + start := time.Date(2000, 4, 2, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_Start_GroupByHour_Descending(t *testing.T) { + start := time.Date(2000, 4, 2, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByDay_Ascending(t *testing.T) { + start := time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 11, 1, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(49 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(73 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByDay_Descending(t *testing.T) { + start := time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 11, 1, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(73 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(49 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByHour_Ascending(t *testing.T) { + start := time.Date(2000, 10, 29, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByHour_Descending(t *testing.T) { + start := time.Date(2000, 10, 29, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Iterators is a test wrapper for iterators. type Iterators []influxql.Iterator @@ -806,6 +1038,27 @@ func TestIteratorOptions_Window_Default(t *testing.T) { } } +func TestIteratorOptions_Window_Location(t *testing.T) { + now := time.Date(2000, 4, 2, 12, 14, 15, 0, LosAngeles) + opt := influxql.IteratorOptions{ + Location: LosAngeles, + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + } + + start, end := opt.Window(now.UnixNano()) + if exp := time.Date(2000, 4, 2, 0, 0, 0, 0, LosAngeles).UnixNano(); start != exp { + t.Errorf("expected start to be %d, got %d", exp, start) + } + if exp := time.Date(2000, 4, 3, 0, 0, 0, 0, LosAngeles).UnixNano(); end != exp { + t.Errorf("expected end to be %d, got %d", exp, end) + } + if got, exp := time.Duration(end-start), 23*time.Hour; got != exp { + t.Errorf("expected duration to be %s, got %s", exp, got) + } +} + func TestIteratorOptions_SeekTime_Ascending(t *testing.T) { opt := influxql.IteratorOptions{ StartTime: 30, diff --git a/influxql/parser.go b/influxql/parser.go index 53aba4f2888..abdcf773250 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -971,6 +971,11 @@ func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, e return nil, err } + // Parse timezone: "TZ()". + if stmt.Location, err = p.parseLocation(); err != nil { + return nil, err + } + // Set if the query is a raw data query or one with an aggregate stmt.IsRawQuery = true WalkFunc(stmt.Fields, func(n Node) { @@ -2240,6 +2245,39 @@ func (p *Parser) parseFill() (FillOption, interface{}, error) { } } +// parseLocation parses the timezone call and its arguments. +func (p *Parser) parseLocation() (*time.Location, error) { + // Parse the expression first. + tok, _, lit := p.scanIgnoreWhitespace() + p.unscan() + if tok != IDENT || strings.ToLower(lit) != "tz" { + return nil, nil + } + + expr, err := p.ParseExpr() + if err != nil { + return nil, err + } + tz, ok := expr.(*Call) + if !ok { + return nil, errors.New("tz must be a function call") + } else if len(tz.Args) != 1 { + return nil, errors.New("tz requires exactly one argument") + } + + tzname, ok := tz.Args[0].(*VarRef) + if !ok { + return nil, errors.New("expected ident argument in tz()") + } + + loc, err := time.LoadLocation(tzname.Val) + if err != nil { + // Do not pass the same error message as the error may contain sensitive pathnames. + return nil, fmt.Errorf("unable to find time zone %s", tzname.Val) + } + return loc, nil +} + // parseOptionalTokenAndInt parses the specified token followed // by an int, if it exists. func (p *Parser) parseOptionalTokenAndInt(t Token) (int, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index 954205bacda..413333db429 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1103,6 +1103,35 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SELECT statement with a time zone + { + s: `SELECT mean(value) FROM cpu WHERE time >= now() - 7d GROUP BY time(1d) TZ("America/Los_Angeles")`, + stmt: &influxql.SelectStatement{ + Fields: []*influxql.Field{{ + Expr: &influxql.Call{ + Name: "mean", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "value"}}, + }}}, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + Condition: &influxql.BinaryExpr{ + Op: influxql.GTE, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.BinaryExpr{ + Op: influxql.SUB, + LHS: &influxql.Call{Name: "now"}, + RHS: &influxql.DurationLiteral{Val: 7 * 24 * time.Hour}, + }, + }, + Dimensions: []*influxql.Dimension{{ + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{Val: 24 * time.Hour}}}}}, + Location: LosAngeles, + }, + }, + // See issues https://github.com/influxdata/influxdb/issues/1647 // and https://github.com/influxdata/influxdb/issues/4404 // DELETE statement @@ -3032,6 +3061,16 @@ func mustParseDuration(s string) time.Duration { return d } +func mustLoadLocation(s string) *time.Location { + l, err := time.LoadLocation(s) + if err != nil { + panic(err) + } + return l +} + +var LosAngeles = mustLoadLocation("America/Los_Angeles") + func duration(v time.Duration) *time.Duration { return &v }