From 4f3d24190b34c1e9dfde79d1efe90267c58a4396 Mon Sep 17 00:00:00 2001 From: "Jonathan A. Sternberg" Date: Fri, 23 Dec 2016 17:14:53 -0600 Subject: [PATCH] Support timezone offsets for queries The timezone for a query can now be added to the end with something like `TZ("America/Los_Angeles")` and it will localize the results of the query to be in that timezone. The offset will automatically be set to the offset for that timezone and offsets will automatically adjust for daylight savings time so grouping by a day will result in a 25 hour day once a year and a 23 hour day another day of the year. The automatic adjustment of intervals for timezone offsets changing will only happen if the group by period is greater than the timezone offset would be. That means grouping by an hour or less will not be affected by daylight savings time, but a 2 hour or 1 day interval will be. The default timezone is UTC and existing queries are unaffected by this change. When times are returned as strings (when `epoch=1` is not used), the results will be returned using the requested timezone format in RFC3339 format. --- CHANGELOG.md | 1 + coordinator/statement_executor.go | 3 + influxql/README.md | 8 +- influxql/ast.go | 6 + influxql/emitter.go | 6 +- influxql/internal/internal.pb.go | 94 ++++++----- influxql/internal/internal.proto | 1 + influxql/iterator.gen.go | 116 +++++++++++--- influxql/iterator.gen.go.tmpl | 29 +++- influxql/iterator.go | 73 ++++++++- influxql/iterator_test.go | 253 ++++++++++++++++++++++++++++++ influxql/parser.go | 38 +++++ influxql/parser_test.go | 39 +++++ tests/server_helpers.go | 10 ++ tests/server_test.go | 95 +++++++++++ 15 files changed, 705 insertions(+), 67 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7489d509217..66087f23163 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ - [#7856](https://github.com/influxdata/influxdb/issues/7856): Failed points during an import now result in a non-zero exit code. - [#7821](https://github.com/influxdata/influxdb/issues/7821): Expose some configuration settings via SHOW DIAGNOSTICS - [#8025](https://github.com/influxdata/influxdb/issues/8025): Support single and multiline comments in InfluxQL. +- [#6541](https://github.com/influxdata/influxdb/issues/6541): Support timezone offsets for queries. ### Bugfixes diff --git a/coordinator/statement_executor.go b/coordinator/statement_executor.go index feea1a1ed73..0260daaed81 100644 --- a/coordinator/statement_executor.go +++ b/coordinator/statement_executor.go @@ -438,6 +438,9 @@ func (e *StatementExecutor) executeSelectStatement(stmt *influxql.SelectStatemen // Generate a row emitter from the iterator set. em := influxql.NewEmitter(itrs, stmt.TimeAscending(), ctx.ChunkSize) em.Columns = stmt.ColumnNames() + if stmt.Location != nil { + em.Location = stmt.Location + } em.OmitTime = stmt.OmitTime defer em.Close() diff --git a/influxql/README.md b/influxql/README.md index 9da91cf5de7..4665eb6f61a 100644 --- a/influxql/README.md +++ b/influxql/README.md @@ -787,7 +787,8 @@ REVOKE READ ON "mydb" FROM "jdoe" ``` select_stmt = "SELECT" fields from_clause [ into_clause ] [ where_clause ] [ group_by_clause ] [ order_by_clause ] [ limit_clause ] - [ offset_clause ] [ slimit_clause ] [ soffset_clause ] . + [ offset_clause ] [ slimit_clause ] [ soffset_clause ] + [ timezone_clause ] . ``` #### Examples: @@ -798,6 +799,9 @@ SELECT mean("value") FROM "cpu" WHERE "region" = 'uswest' GROUP BY time(10m) fil -- select from all measurements beginning with cpu into the same measurement name in the cpu_1h retention policy SELECT mean("value") INTO "cpu_1h".:MEASUREMENT FROM /cpu.*/ + +-- select from measurements grouped by the day with a timezone +SELECT mean("value") FROM "cpu" GROUP BY region, time(1d) fill(0) tz("America/Chicago") ``` ## Clauses @@ -817,6 +821,8 @@ slimit_clause = "SLIMIT" int_lit . soffset_clause = "SOFFSET" int_lit . +timezone_clause = tz(identifier) . + on_clause = "ON" db_name . order_by_clause = "ORDER BY" sort_fields . diff --git a/influxql/ast.go b/influxql/ast.go index d1e3ee15849..e304fd26fc1 100644 --- a/influxql/ast.go +++ b/influxql/ast.go @@ -984,6 +984,9 @@ type SelectStatement struct { // The value to fill empty aggregate buckets with, if any. FillValue interface{} + // The timezone for the query, if any. + Location *time.Location + // Renames the implicit time field name. TimeAlias string @@ -1650,6 +1653,9 @@ func (s *SelectStatement) String() string { if s.SOffset > 0 { _, _ = fmt.Fprintf(&buf, " SOFFSET %d", s.SOffset) } + if s.Location != nil { + _, _ = fmt.Fprintf(&buf, ` TZ("%s")`, s.Location) + } return buf.String() } diff --git a/influxql/emitter.go b/influxql/emitter.go index ba6b90ed627..625c7ca5eff 100644 --- a/influxql/emitter.go +++ b/influxql/emitter.go @@ -20,6 +20,9 @@ type Emitter struct { // The columns to attach to each row. Columns []string + // The time zone location. + Location *time.Location + // Removes the "time" column from output. // Used for meta queries where time does not apply. OmitTime bool @@ -32,6 +35,7 @@ func NewEmitter(itrs []Iterator, ascending bool, chunkSize int) *Emitter { itrs: itrs, ascending: ascending, chunkSize: chunkSize, + Location: time.UTC, } } @@ -154,7 +158,7 @@ func (e *Emitter) readAt(t int64, name string, tags Tags) []interface{} { values := make([]interface{}, len(e.itrs)+offset) if !e.OmitTime { - values[0] = time.Unix(0, t).UTC() + values[0] = time.Unix(0, t).In(e.Location) } e.readInto(t, name, tags, values[offset:]) return values diff --git a/influxql/internal/internal.pb.go b/influxql/internal/internal.pb.go index 5c9d474dd61..ce8d9c61f29 100644 --- a/influxql/internal/internal.pb.go +++ b/influxql/internal/internal.pb.go @@ -194,6 +194,7 @@ type IteratorOptions struct { Condition *string `protobuf:"bytes,8,opt,name=Condition" json:"Condition,omitempty"` StartTime *int64 `protobuf:"varint,9,opt,name=StartTime" json:"StartTime,omitempty"` EndTime *int64 `protobuf:"varint,10,opt,name=EndTime" json:"EndTime,omitempty"` + Location *string `protobuf:"bytes,21,opt,name=Location" json:"Location,omitempty"` Ascending *bool `protobuf:"varint,11,opt,name=Ascending" json:"Ascending,omitempty"` Limit *int64 `protobuf:"varint,12,opt,name=Limit" json:"Limit,omitempty"` Offset *int64 `protobuf:"varint,13,opt,name=Offset" json:"Offset,omitempty"` @@ -294,6 +295,13 @@ func (m *IteratorOptions) GetEndTime() int64 { return 0 } +func (m *IteratorOptions) GetLocation() string { + if m != nil && m.Location != nil { + return *m.Location + } + return "" +} + func (m *IteratorOptions) GetAscending() bool { if m != nil && m.Ascending != nil { return *m.Ascending @@ -505,44 +513,52 @@ func init() { func init() { proto.RegisterFile("internal/internal.proto", fileDescriptorInternal) } var fileDescriptorInternal = []byte{ - // 609 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x7c, 0x53, 0xe1, 0x4e, 0xdb, 0x30, - 0x10, 0x56, 0xea, 0xa6, 0x4d, 0x2e, 0x2d, 0x6d, 0x0d, 0x13, 0xd6, 0x7e, 0x45, 0x1d, 0x42, 0xf9, - 0x31, 0xb1, 0x09, 0xed, 0x05, 0xca, 0x80, 0xa9, 0xd2, 0x28, 0x88, 0x20, 0xfe, 0x7b, 0xe4, 0x1a, - 0x59, 0x72, 0x9d, 0xce, 0x76, 0xa6, 0xf2, 0x38, 0x7b, 0xbe, 0xbd, 0xc4, 0x64, 0x37, 0xa1, 0x1d, - 0x42, 0xfb, 0x97, 0xfb, 0x7c, 0x77, 0xf9, 0xee, 0xfb, 0xee, 0xe0, 0x58, 0x28, 0x8b, 0x5a, 0x71, - 0xf9, 0xa9, 0xfd, 0x38, 0x5b, 0xeb, 0xca, 0x56, 0x34, 0x12, 0x6a, 0x29, 0xeb, 0xcd, 0x4f, 0x39, - 0xfd, 0x13, 0x40, 0x78, 0x57, 0x09, 0x65, 0xe9, 0x00, 0xba, 0x0b, 0xbe, 0x42, 0x16, 0xa4, 0x9d, - 0x2c, 0x76, 0xd1, 0x03, 0x2f, 0x0d, 0xeb, 0xbc, 0x44, 0x62, 0x85, 0x8c, 0xa4, 0x9d, 0x8c, 0xd0, - 0x04, 0xc8, 0x42, 0x48, 0xd6, 0x4d, 0x3b, 0x59, 0x44, 0xdf, 0x03, 0x99, 0xd5, 0x1b, 0x16, 0xa6, - 0x24, 0x4b, 0xce, 0x87, 0x67, 0x6d, 0xe3, 0xb3, 0x59, 0xbd, 0xa1, 0x14, 0x60, 0x56, 0x96, 0x1a, - 0x4b, 0x6e, 0xb1, 0x60, 0xbd, 0x34, 0xc8, 0x86, 0x0e, 0xbb, 0x96, 0x15, 0xb7, 0x8f, 0x5c, 0xd6, - 0xc8, 0xfa, 0x69, 0x90, 0x05, 0xf4, 0x08, 0x06, 0x73, 0x65, 0xb1, 0x44, 0xbd, 0x45, 0xa3, 0x34, - 0xc8, 0x08, 0x3d, 0x84, 0x24, 0xb7, 0x5a, 0xa8, 0x72, 0x0b, 0xc6, 0x69, 0x90, 0xc5, 0x2e, 0xf5, - 0xa2, 0xaa, 0x24, 0x72, 0xb5, 0x45, 0x21, 0x0d, 0xb2, 0x88, 0x9e, 0x42, 0x98, 0x5b, 0x6e, 0x0d, - 0x4b, 0xd2, 0x20, 0x4b, 0xce, 0x8f, 0x77, 0x34, 0xe6, 0x16, 0x35, 0xb7, 0x95, 0xf6, 0xcf, 0x53, - 0xe9, 0xc9, 0xd2, 0x31, 0x44, 0x97, 0xdc, 0xf2, 0x87, 0xe7, 0xf5, 0x76, 0xdc, 0xf0, 0x15, 0xab, - 0xce, 0x9b, 0xac, 0xc8, 0x5b, 0xac, 0xba, 0x6f, 0xb2, 0x0a, 0x1d, 0xab, 0xe9, 0x6f, 0x02, 0xa3, - 0xf6, 0xff, 0xb7, 0x6b, 0x2b, 0x2a, 0x65, 0x9c, 0x92, 0x57, 0x9b, 0xb5, 0x66, 0x81, 0xaf, 0x4b, - 0xb6, 0xe2, 0x75, 0x52, 0x92, 0xc5, 0x34, 0x85, 0xde, 0xb5, 0x40, 0x59, 0x18, 0x36, 0xf1, 0x62, - 0x8e, 0x77, 0x53, 0x3c, 0x72, 0x7d, 0x8f, 0x4b, 0x7a, 0x0a, 0xfd, 0xbc, 0xaa, 0xf5, 0x13, 0x1a, - 0x46, 0x7c, 0xca, 0xbb, 0x5d, 0xca, 0x0d, 0x72, 0x53, 0x6b, 0x5c, 0xa1, 0xb2, 0xf4, 0x04, 0x22, - 0xc7, 0x5c, 0xff, 0xe2, 0xd2, 0x13, 0x4c, 0xce, 0xe9, 0x9e, 0x22, 0xcd, 0x8b, 0x9b, 0xf9, 0x52, - 0xac, 0x50, 0x19, 0x47, 0xcc, 0x1b, 0x18, 0xd3, 0x11, 0xf4, 0xbf, 0xe9, 0xaa, 0x5e, 0x5f, 0x3c, - 0xb3, 0x43, 0x0f, 0x0c, 0xa0, 0x7b, 0x2d, 0xa4, 0xf4, 0xe6, 0x85, 0x74, 0x02, 0xb1, 0x8b, 0xf6, - 0xbd, 0x9b, 0x40, 0xfc, 0xb5, 0x52, 0x85, 0x70, 0xe3, 0x79, 0xe3, 0x62, 0x07, 0xe5, 0x96, 0x6b, - 0xeb, 0x57, 0x26, 0xf6, 0xaa, 0x8d, 0xa0, 0x7f, 0xa5, 0x0a, 0x0f, 0x80, 0x07, 0x26, 0x10, 0xcf, - 0xcc, 0x13, 0xaa, 0x42, 0xa8, 0xd2, 0xbb, 0x16, 0xd1, 0x21, 0x84, 0xdf, 0xc5, 0x4a, 0x58, 0x36, - 0xf0, 0x19, 0x07, 0xd0, 0xbb, 0x5d, 0x2e, 0x0d, 0x5a, 0x36, 0x6c, 0xe3, 0x7c, 0xfb, 0x7e, 0xd0, - 0xb6, 0xcc, 0x9b, 0x84, 0x51, 0x9b, 0x70, 0x89, 0x45, 0xbd, 0x46, 0x36, 0xf6, 0xfd, 0x28, 0xc0, - 0x0d, 0xdf, 0xe4, 0xa8, 0x05, 0x9a, 0x05, 0xa3, 0x6d, 0xd1, 0xad, 0x2e, 0x50, 0x63, 0xc1, 0x8e, - 0xbc, 0x47, 0x5f, 0x60, 0xb0, 0xa7, 0x9c, 0xa1, 0x27, 0x10, 0xce, 0x2d, 0xae, 0x0c, 0x0b, 0xfe, - 0x23, 0xf0, 0xb4, 0x84, 0x64, 0x5f, 0xef, 0x66, 0x9f, 0x7e, 0x70, 0x83, 0x8d, 0xb1, 0xc7, 0x30, - 0xba, 0x47, 0x8b, 0xca, 0xa9, 0x72, 0x57, 0x49, 0xf1, 0xf4, 0xec, 0x97, 0x2a, 0x7e, 0xb9, 0x32, - 0xe2, 0xa3, 0x21, 0x84, 0xf7, 0x58, 0xe2, 0xa6, 0x59, 0xa3, 0x31, 0x44, 0x73, 0xf3, 0xc0, 0x75, - 0x89, 0xb6, 0x59, 0xa1, 0x8f, 0x3b, 0x27, 0xfd, 0x5f, 0x6a, 0xcd, 0xbd, 0xd0, 0xc1, 0x2b, 0x89, - 0x5c, 0x73, 0x32, 0xfd, 0x0c, 0xc3, 0x7f, 0xf6, 0xdd, 0x6b, 0xd4, 0xcc, 0xff, 0x52, 0xe1, 0xaf, - 0x7d, 0xd1, 0x54, 0x7c, 0x80, 0x5e, 0xb3, 0x5b, 0x09, 0x90, 0x47, 0x2e, 0xf7, 0xae, 0xdf, 0x1d, - 0x87, 0x4b, 0x0a, 0xff, 0x06, 0x00, 0x00, 0xff, 0xff, 0xbc, 0x54, 0x7d, 0x69, 0x47, 0x04, 0x00, + // 737 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0x84, 0x54, 0xdd, 0x6a, 0xdb, 0x4a, + 0x10, 0x46, 0x92, 0xe5, 0x58, 0xeb, 0xf8, 0x24, 0x67, 0x4f, 0x7e, 0x96, 0xc3, 0xe1, 0x54, 0xe8, + 0x4a, 0x50, 0xea, 0x40, 0x6e, 0x0b, 0x05, 0xa7, 0x49, 0x8a, 0x21, 0x71, 0xc2, 0x2a, 0xe4, 0x7e, + 0x6b, 0x8d, 0xc5, 0x82, 0x2c, 0xb9, 0xab, 0x55, 0x71, 0x1e, 0xa5, 0xcf, 0xd0, 0x87, 0xe9, 0xab, + 0xf4, 0x11, 0xca, 0xce, 0x4a, 0x96, 0x92, 0x42, 0x73, 0xa5, 0xf9, 0xbe, 0x99, 0x1d, 0xed, 0xcc, + 0x37, 0x3b, 0xe4, 0x54, 0x16, 0x1a, 0x54, 0x21, 0xf2, 0xb3, 0xd6, 0x98, 0x6e, 0x54, 0xa9, 0x4b, + 0x3a, 0x92, 0xc5, 0x2a, 0xaf, 0xb7, 0x5f, 0xf2, 0xe8, 0x87, 0x4b, 0xfc, 0xfb, 0x52, 0x16, 0x9a, + 0x52, 0x32, 0x58, 0x88, 0x35, 0x30, 0x27, 0x74, 0xe3, 0x80, 0xa3, 0x6d, 0xb8, 0x07, 0x91, 0x55, + 0xcc, 0xb5, 0x9c, 0xb1, 0x91, 0x93, 0x6b, 0x60, 0x5e, 0xe8, 0xc6, 0x1e, 0x47, 0x9b, 0x1e, 0x12, + 0x6f, 0x21, 0x73, 0x36, 0x08, 0xdd, 0x78, 0xc4, 0x8d, 0x49, 0xdf, 0x10, 0x6f, 0x56, 0x6f, 0x99, + 0x1f, 0x7a, 0xf1, 0xf8, 0x7c, 0x32, 0x6d, 0xff, 0x37, 0x9d, 0xd5, 0x5b, 0x6e, 0x3c, 0xf4, 0x7f, + 0x42, 0x66, 0x59, 0xa6, 0x20, 0x13, 0x1a, 0x52, 0x36, 0x0c, 0x9d, 0x78, 0xc2, 0x7b, 0x8c, 0xf1, + 0x5f, 0xe7, 0xa5, 0xd0, 0x8f, 0x22, 0xaf, 0x81, 0xed, 0x85, 0x4e, 0xec, 0xf0, 0x1e, 0x43, 0x23, + 0xb2, 0x3f, 0x2f, 0x34, 0x64, 0xa0, 0x6c, 0xc4, 0x28, 0x74, 0x62, 0x8f, 0x3f, 0xe3, 0x68, 0x48, + 0xc6, 0x89, 0x56, 0xb2, 0xc8, 0x6c, 0x48, 0x10, 0x3a, 0x71, 0xc0, 0xfb, 0x94, 0xc9, 0x72, 0x51, + 0x96, 0x39, 0x88, 0xc2, 0x86, 0x90, 0xd0, 0x89, 0x47, 0xfc, 0x19, 0x47, 0xdf, 0x11, 0x3f, 0xd1, + 0x42, 0x57, 0x6c, 0x1c, 0x3a, 0xf1, 0xf8, 0xfc, 0xb4, 0x2b, 0x66, 0xae, 0x41, 0x09, 0x5d, 0x2a, + 0x74, 0x73, 0x1b, 0x15, 0x7d, 0x77, 0xb0, 0x74, 0xfa, 0x2f, 0x19, 0x5d, 0x0a, 0x2d, 0x1e, 0x9e, + 0x36, 0xb6, 0xa7, 0x3e, 0xdf, 0xe1, 0x17, 0xc5, 0xb9, 0xaf, 0x16, 0xe7, 0xbd, 0x5e, 0xdc, 0xe0, + 0xf5, 0xe2, 0xfc, 0xdf, 0x8b, 0x8b, 0x7e, 0x0e, 0xc8, 0x41, 0x5b, 0xc6, 0xdd, 0x46, 0xcb, 0xb2, + 0x40, 0x85, 0xaf, 0xb6, 0x1b, 0xc5, 0x1c, 0x4c, 0x89, 0xb6, 0x51, 0xd8, 0xe8, 0xe9, 0x86, 0x5e, + 0x1c, 0x58, 0x01, 0x63, 0x32, 0xbc, 0x96, 0x90, 0xa7, 0x15, 0xfb, 0x1b, 0x45, 0x3e, 0xec, 0xfa, + 0xf2, 0x28, 0x14, 0x87, 0x15, 0x6f, 0xfc, 0xf4, 0x8c, 0xec, 0x25, 0x65, 0xad, 0x96, 0x50, 0x31, + 0x0f, 0x43, 0x8f, 0xbb, 0xd0, 0x5b, 0x10, 0x55, 0xad, 0x60, 0x0d, 0x85, 0xe6, 0x6d, 0x14, 0x9d, + 0x92, 0x91, 0x29, 0x55, 0x7d, 0x15, 0x39, 0xd6, 0x35, 0x3e, 0xa7, 0xbd, 0xa6, 0x37, 0x1e, 0xbe, + 0x8b, 0x31, 0xed, 0xbc, 0x94, 0x6b, 0x28, 0x2a, 0x73, 0x7d, 0x9c, 0xb9, 0x80, 0xf7, 0x18, 0xca, + 0xc8, 0xde, 0x27, 0x55, 0xd6, 0x9b, 0x8b, 0x27, 0xf6, 0x0f, 0x3a, 0x5b, 0x68, 0x4a, 0xbd, 0x96, + 0x79, 0x8e, 0xf3, 0xe7, 0x73, 0xb4, 0xe9, 0x7f, 0x24, 0x30, 0xdf, 0xfe, 0xe0, 0x75, 0x84, 0xf1, + 0x7e, 0x2c, 0x8b, 0x54, 0x9a, 0x56, 0xe1, 0xd0, 0x05, 0xbc, 0x23, 0x8c, 0x37, 0xd1, 0x42, 0x69, + 0x7c, 0x21, 0x01, 0xaa, 0xd6, 0x11, 0xe6, 0x1e, 0x57, 0x45, 0x8a, 0x3e, 0x82, 0xbe, 0x16, 0x9a, + 0x61, 0xb9, 0x29, 0x97, 0x02, 0x93, 0x1e, 0x63, 0xd2, 0x1d, 0x36, 0x39, 0x67, 0xd5, 0x12, 0x8a, + 0x54, 0x16, 0x19, 0xce, 0xe0, 0x88, 0x77, 0x04, 0x3d, 0x22, 0xfe, 0x8d, 0x5c, 0x4b, 0xcd, 0xf6, + 0x31, 0xa3, 0x05, 0xf4, 0x84, 0x0c, 0xef, 0x56, 0xab, 0x0a, 0x34, 0x9b, 0x20, 0xdd, 0x20, 0xc3, + 0x27, 0x36, 0xfc, 0x2f, 0xcb, 0x5b, 0x64, 0x6e, 0x96, 0x34, 0x07, 0x0e, 0xec, 0xcd, 0x92, 0xee, + 0xc4, 0x25, 0xa4, 0xf5, 0x06, 0xd8, 0x21, 0xfe, 0xba, 0x41, 0xa6, 0xe7, 0xb7, 0x62, 0x9b, 0x80, + 0x92, 0x50, 0x2d, 0x18, 0xc5, 0x43, 0x3d, 0xc6, 0x64, 0xbc, 0x53, 0x29, 0x28, 0x48, 0xd9, 0x11, + 0x1e, 0x6c, 0x61, 0xf4, 0x9e, 0xec, 0xf7, 0x54, 0xaf, 0xe8, 0x5b, 0xe2, 0xcf, 0x35, 0xac, 0x2b, + 0xe6, 0xfc, 0x69, 0x38, 0x6c, 0x4c, 0xf4, 0xcd, 0x21, 0xe3, 0x1e, 0xdd, 0xbe, 0xb2, 0xcf, 0xa2, + 0x82, 0x66, 0x5e, 0x77, 0x98, 0xc6, 0xe4, 0x80, 0x83, 0x86, 0xc2, 0x74, 0xf1, 0xbe, 0xcc, 0xe5, + 0xf2, 0x09, 0x9f, 0x5a, 0xc0, 0x5f, 0xd2, 0xbb, 0xdd, 0xe7, 0xd9, 0x89, 0xc7, 0xdd, 0x77, 0x44, + 0x7c, 0x0e, 0x19, 0x6c, 0x9b, 0x97, 0x65, 0x81, 0xf9, 0xdf, 0xbc, 0x7a, 0x10, 0x2a, 0x03, 0xdd, + 0xbc, 0xa7, 0x1d, 0x8e, 0x3e, 0x74, 0x63, 0x8b, 0xf7, 0xaa, 0x95, 0x15, 0xd4, 0xc1, 0xe6, 0xec, + 0x70, 0x4f, 0x1c, 0xb7, 0x2f, 0x4e, 0x34, 0x23, 0x93, 0x67, 0x1b, 0x05, 0x55, 0x69, 0x1a, 0xec, + 0x34, 0xaa, 0x34, 0xdd, 0x3d, 0x21, 0x43, 0xdc, 0xda, 0x8b, 0x36, 0x85, 0x45, 0xd1, 0x94, 0x0c, + 0xed, 0xe3, 0x33, 0x0f, 0xf6, 0x51, 0xe4, 0xcd, 0x36, 0x37, 0x26, 0x2e, 0x6e, 0xb3, 0x8c, 0x5c, + 0x3b, 0xeb, 0xc6, 0xfe, 0x15, 0x00, 0x00, 0xff, 0xff, 0xca, 0x3e, 0x5e, 0x08, 0x22, 0x06, 0x00, 0x00, } diff --git a/influxql/internal/internal.proto b/influxql/internal/internal.proto index cbbc976d7d8..158372b809d 100644 --- a/influxql/internal/internal.proto +++ b/influxql/internal/internal.proto @@ -38,6 +38,7 @@ message IteratorOptions { optional string Condition = 8; optional int64 StartTime = 9; optional int64 EndTime = 10; + optional string Location = 21; optional bool Ascending = 11; optional int64 Limit = 12; optional int64 Offset = 13; diff --git a/influxql/iterator.gen.go b/influxql/iterator.gen.go index 03f0fc7cca5..19355ca03b5 100644 --- a/influxql/iterator.gen.go +++ b/influxql/iterator.gen.go @@ -573,9 +573,10 @@ type floatFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -622,6 +623,9 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -657,6 +661,9 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = FloatPoint{Nil: true} break } @@ -711,9 +718,21 @@ func (itr *floatFillIterator) Next() (*FloatPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } @@ -3230,9 +3249,10 @@ type integerFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -3279,6 +3299,9 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -3314,6 +3337,9 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = IntegerPoint{Nil: true} break } @@ -3368,9 +3394,21 @@ func (itr *integerFillIterator) Next() (*IntegerPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } @@ -5884,9 +5922,10 @@ type stringFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -5933,6 +5972,9 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -5968,6 +6010,9 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = StringPoint{Nil: true} break } @@ -6008,9 +6053,21 @@ func (itr *stringFillIterator) Next() (*StringPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } @@ -8524,9 +8581,10 @@ type booleanFillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -8573,6 +8631,9 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -8608,6 +8669,9 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = BooleanPoint{Nil: true} break } @@ -8648,9 +8712,21 @@ func (itr *booleanFillIterator) Next() (*BooleanPoint, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } diff --git a/influxql/iterator.gen.go.tmpl b/influxql/iterator.gen.go.tmpl index d8040f5d89b..eb16d59788f 100644 --- a/influxql/iterator.gen.go.tmpl +++ b/influxql/iterator.gen.go.tmpl @@ -571,9 +571,10 @@ type {{$k.name}}FillIterator struct { opt IteratorOptions window struct { - name string - tags Tags - time int64 + name string + tags Tags + time int64 + offset int64 } } @@ -620,6 +621,9 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { } itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.init = true } @@ -655,6 +659,9 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { // Set the new interval. itr.window.name, itr.window.tags = p.Name, p.Tags itr.window.time = itr.startTime + if itr.opt.Location != nil { + _, itr.window.offset = itr.opt.Zone(itr.window.time) + } itr.prev = {{$k.Name}}Point{Nil: true} break } @@ -712,9 +719,21 @@ func (itr *{{$k.name}}FillIterator) Next() (*{{$k.Name}}Point, error) { // as there may be lingering points with the same timestamp in the previous // window. if itr.opt.Ascending { - itr.window.time = p.Time + int64(itr.opt.Interval.Duration) + itr.window.time += int64(itr.opt.Interval.Duration) } else { - itr.window.time = p.Time - int64(itr.opt.Interval.Duration) + itr.window.time -= int64(itr.opt.Interval.Duration) + } + + // Check to see if we have passed over an offset change and adjust the time + // to account for this new offset. + if itr.opt.Location != nil { + if _, offset := itr.opt.Zone(itr.window.time); offset != itr.window.offset { + diff := itr.window.offset - offset + if abs(diff) < int64(itr.opt.Interval.Duration) { + itr.window.time += diff + } + itr.window.offset = offset + } } return p, nil } diff --git a/influxql/iterator.go b/influxql/iterator.go index 901f50f52c2..cdaef7d9664 100644 --- a/influxql/iterator.go +++ b/influxql/iterator.go @@ -26,6 +26,9 @@ const ( // MaxTime is used as the maximum time value when computing an unbounded range. // This time is 2262-04-11 23:47:16.854775806 +0000 UTC MaxTime = models.MaxNanoTime + + // secToNs is the number of nanoseconds in a second. + secToNs = int64(time.Second) ) // Iterator represents a generic interface for all Iterators. @@ -657,6 +660,7 @@ type IteratorOptions struct { Interval Interval Dimensions []string // The final dimensions of the query (stays the same even in subqueries). GroupBy map[string]struct{} // Dimensions to group points by in intermediate iterators. + Location *time.Location // Fill options. Fill FillOption @@ -718,6 +722,7 @@ func newIteratorOptionsStmt(stmt *SelectStatement, sopt *SelectOptions) (opt Ite opt.EndTime = MaxTime } } + opt.Location = stmt.Location // Determine group by interval. interval, err := stmt.GroupByInterval() @@ -839,6 +844,13 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) { // Subtract the offset to the time so we calculate the correct base interval. t -= int64(opt.Interval.Offset) + // Retrieve the zone offset for the start time. + var startOffset int64 + if opt.Location != nil { + _, startOffset = opt.Zone(t) + t += startOffset + } + // Truncate time by duration. dt := t % int64(opt.Interval.Duration) if dt < 0 { @@ -848,9 +860,37 @@ func (opt IteratorOptions) Window(t int64) (start, end int64) { } t -= dt + // Look for the start offset again because the first time may have been + // after the offset switch. Now that we are at midnight in UTC, we can + // lookup the zone offset again to get the real starting offset. + if opt.Location != nil { + _, adjustedOffset := opt.Zone(t) + // Do not adjust the offset if the offset change is greater than or + // equal to the duration. + if o := startOffset - adjustedOffset; o != 0 && abs(o) < int64(opt.Interval.Duration) { + startOffset = adjustedOffset + } + } + // Apply the offset. - start = t + int64(opt.Interval.Offset) + start = t + int64(opt.Interval.Offset) - startOffset end = start + int64(opt.Interval.Duration) + + // Retrieve the zone offset for the end time. + if opt.Location != nil { + _, endOffset := opt.Zone(end) + // Adjust the end time if the offset is different from the start offset. + if startOffset != endOffset { + offset := startOffset - endOffset + + // Only apply the offset if it is smaller than the duration. + // This prevents going back in time and creating time windows + // that don't make any sense. + if abs(offset) < int64(opt.Interval.Duration) { + end += offset + } + } + } return } @@ -891,6 +931,17 @@ func (opt IteratorOptions) GetDimensions() []string { return opt.Dimensions } +// Zone returns the zone information for the given time. The offset is in nanoseconds. +func (opt *IteratorOptions) Zone(ns int64) (string, int64) { + if opt.Location == nil { + return "", 0 + } + + t := time.Unix(0, ns).In(opt.Location) + name, offset := t.Zone() + return name, secToNs * int64(offset) +} + // MarshalBinary encodes opt into a binary format. func (opt *IteratorOptions) MarshalBinary() ([]byte, error) { return proto.Marshal(encodeIteratorOptions(opt)) @@ -934,6 +985,11 @@ func encodeIteratorOptions(opt *IteratorOptions) *internal.IteratorOptions { pb.Expr = proto.String(opt.Expr.String()) } + // Set the location, if set. + if opt.Location != nil { + pb.Location = proto.String(opt.Location.String()) + } + // Convert and encode aux fields as variable references. pb.Fields = make([]*internal.VarRef, len(opt.Aux)) pb.Aux = make([]string, len(opt.Aux)) @@ -1001,6 +1057,14 @@ func decodeIteratorOptions(pb *internal.IteratorOptions) (*IteratorOptions, erro opt.Expr = expr } + if pb.Location != nil { + loc, err := time.LoadLocation(pb.GetLocation()) + if err != nil { + return nil, err + } + opt.Location = loc + } + // Convert and decode variable references. if fields := pb.GetFields(); fields != nil { opt.Aux = make([]VarRef, len(fields)) @@ -1272,3 +1336,10 @@ type reverseStringSlice []string func (p reverseStringSlice) Len() int { return len(p) } func (p reverseStringSlice) Less(i, j int) bool { return p[i] > p[j] } func (p reverseStringSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +func abs(v int64) int64 { + if v < 0 { + return -v + } + return v +} diff --git a/influxql/iterator_test.go b/influxql/iterator_test.go index 63f9d2ee640..dbebac6f858 100644 --- a/influxql/iterator_test.go +++ b/influxql/iterator_test.go @@ -705,6 +705,238 @@ func TestLimitIterator(t *testing.T) { } } +func TestFillIterator_DST_Start_GroupByDay_Ascending(t *testing.T) { + start := time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 4, 5, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(47 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(71 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_Start_GroupByDay_Descending(t *testing.T) { + start := time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 4, 5, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(71 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(47 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_Start_GroupByHour_Ascending(t *testing.T) { + start := time.Date(2000, 4, 2, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_Start_GroupByHour_Descending(t *testing.T) { + start := time.Date(2000, 4, 2, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByDay_Ascending(t *testing.T) { + start := time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 11, 1, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(49 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(73 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByDay_Descending(t *testing.T) { + start := time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles) + end := time.Date(2000, 11, 1, 0, 0, 0, 0, LosAngeles).Add(-time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(73 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(49 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(24 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByHour_Ascending(t *testing.T) { + start := time.Date(2000, 10, 29, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: true, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + +func TestFillIterator_DST_End_GroupByHour_Descending(t *testing.T) { + start := time.Date(2000, 10, 29, 0, 0, 0, 0, LosAngeles) + end := start.Add(4*time.Hour - time.Nanosecond) + itr := influxql.NewFillIterator( + &FloatIterator{Points: []influxql.FloatPoint{{Time: start.UnixNano(), Value: 0}}}, + nil, + influxql.IteratorOptions{ + StartTime: start.UnixNano(), + EndTime: end.UnixNano(), + Interval: influxql.Interval{ + Duration: time.Hour, + }, + Location: LosAngeles, + Ascending: false, + }, + ) + + if a, err := (Iterators{itr}).ReadAll(); err != nil { + t.Fatalf("unexpected error: %s", err) + } else if !deep.Equal(a, [][]influxql.Point{ + {&influxql.FloatPoint{Time: start.Add(3 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(2 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.Add(1 * time.Hour).UnixNano(), Nil: true}}, + {&influxql.FloatPoint{Time: start.UnixNano(), Value: 0}}, + }) { + t.Fatalf("unexpected points: %s", spew.Sdump(a)) + } +} + // Iterators is a test wrapper for iterators. type Iterators []influxql.Iterator @@ -814,6 +1046,27 @@ func TestIteratorOptions_Window_Default(t *testing.T) { } } +func TestIteratorOptions_Window_Location(t *testing.T) { + now := time.Date(2000, 4, 2, 12, 14, 15, 0, LosAngeles) + opt := influxql.IteratorOptions{ + Location: LosAngeles, + Interval: influxql.Interval{ + Duration: 24 * time.Hour, + }, + } + + start, end := opt.Window(now.UnixNano()) + if exp := time.Date(2000, 4, 2, 0, 0, 0, 0, LosAngeles).UnixNano(); start != exp { + t.Errorf("expected start to be %d, got %d", exp, start) + } + if exp := time.Date(2000, 4, 3, 0, 0, 0, 0, LosAngeles).UnixNano(); end != exp { + t.Errorf("expected end to be %d, got %d", exp, end) + } + if got, exp := time.Duration(end-start), 23*time.Hour; got != exp { + t.Errorf("expected duration to be %s, got %s", exp, got) + } +} + func TestIteratorOptions_SeekTime_Ascending(t *testing.T) { opt := influxql.IteratorOptions{ StartTime: 30, diff --git a/influxql/parser.go b/influxql/parser.go index 2a1be3edb61..31a71cf9a81 100644 --- a/influxql/parser.go +++ b/influxql/parser.go @@ -956,6 +956,11 @@ func (p *Parser) parseSelectStatement(tr targetRequirement) (*SelectStatement, e return nil, err } + // Parse timezone: "TZ()". + if stmt.Location, err = p.parseLocation(); err != nil { + return nil, err + } + // Set if the query is a raw data query or one with an aggregate stmt.IsRawQuery = true WalkFunc(stmt.Fields, func(n Node) { @@ -2217,6 +2222,39 @@ func (p *Parser) parseFill() (FillOption, interface{}, error) { } } +// parseLocation parses the timezone call and its arguments. +func (p *Parser) parseLocation() (*time.Location, error) { + // Parse the expression first. + tok, _, lit := p.scanIgnoreWhitespace() + p.unscan() + if tok != IDENT || strings.ToLower(lit) != "tz" { + return nil, nil + } + + expr, err := p.ParseExpr() + if err != nil { + return nil, err + } + tz, ok := expr.(*Call) + if !ok { + return nil, errors.New("tz must be a function call") + } else if len(tz.Args) != 1 { + return nil, errors.New("tz requires exactly one argument") + } + + tzname, ok := tz.Args[0].(*VarRef) + if !ok { + return nil, errors.New("expected ident argument in tz()") + } + + loc, err := time.LoadLocation(tzname.Val) + if err != nil { + // Do not pass the same error message as the error may contain sensitive pathnames. + return nil, fmt.Errorf("unable to find time zone %s", tzname.Val) + } + return loc, nil +} + // parseOptionalTokenAndInt parses the specified token followed // by an int, if it exists. func (p *Parser) parseOptionalTokenAndInt(t Token) (int, error) { diff --git a/influxql/parser_test.go b/influxql/parser_test.go index d8064e26701..eed79bb1ef2 100644 --- a/influxql/parser_test.go +++ b/influxql/parser_test.go @@ -1358,6 +1358,35 @@ func TestParser_ParseStatement(t *testing.T) { }, }, + // SELECT statement with a time zone + { + s: `SELECT mean(value) FROM cpu WHERE time >= now() - 7d GROUP BY time(1d) TZ("America/Los_Angeles")`, + stmt: &influxql.SelectStatement{ + Fields: []*influxql.Field{{ + Expr: &influxql.Call{ + Name: "mean", + Args: []influxql.Expr{ + &influxql.VarRef{Val: "value"}}, + }}}, + Sources: []influxql.Source{&influxql.Measurement{Name: "cpu"}}, + Condition: &influxql.BinaryExpr{ + Op: influxql.GTE, + LHS: &influxql.VarRef{Val: "time"}, + RHS: &influxql.BinaryExpr{ + Op: influxql.SUB, + LHS: &influxql.Call{Name: "now"}, + RHS: &influxql.DurationLiteral{Val: 7 * 24 * time.Hour}, + }, + }, + Dimensions: []*influxql.Dimension{{ + Expr: &influxql.Call{ + Name: "time", + Args: []influxql.Expr{ + &influxql.DurationLiteral{Val: 24 * time.Hour}}}}}, + Location: LosAngeles, + }, + }, + // See issues https://github.com/influxdata/influxdb/issues/1647 // and https://github.com/influxdata/influxdb/issues/4404 // DELETE statement @@ -3361,6 +3390,16 @@ func mustParseDuration(s string) time.Duration { return d } +func mustLoadLocation(s string) *time.Location { + l, err := time.LoadLocation(s) + if err != nil { + panic(err) + } + return l +} + +var LosAngeles = mustLoadLocation("America/Los_Angeles") + func duration(v time.Duration) *time.Duration { return &v } diff --git a/tests/server_helpers.go b/tests/server_helpers.go index b08bdd89c6e..592aa935e83 100644 --- a/tests/server_helpers.go +++ b/tests/server_helpers.go @@ -492,6 +492,16 @@ func mustParseTime(layout, value string) time.Time { return tm } +func mustParseLocation(tzname string) *time.Location { + loc, err := time.LoadLocation(tzname) + if err != nil { + panic(err) + } + return loc +} + +var LosAngeles = mustParseLocation("America/Los_Angeles") + // MustReadAll reads r. Panic on error. func MustReadAll(r io.Reader) []byte { b, err := ioutil.ReadAll(r) diff --git a/tests/server_test.go b/tests/server_test.go index f69835b93a8..720884a7eef 100644 --- a/tests/server_test.go +++ b/tests/server_test.go @@ -6071,6 +6071,101 @@ func TestServer_Query_Fill(t *testing.T) { } } +func TestServer_Query_TimeZone(t *testing.T) { + t.Parallel() + s := OpenServer(NewConfig()) + defer s.Close() + + if err := s.CreateDatabaseAndRetentionPolicy("db0", newRetentionPolicySpec("rp0", 1, 0), true); err != nil { + t.Fatal(err) + } + + var writes []string + for _, start := range []time.Time{ + // One day before DST starts. + time.Date(2000, 4, 1, 0, 0, 0, 0, LosAngeles), + // Middle of DST. No change. + time.Date(2000, 6, 1, 0, 0, 0, 0, LosAngeles), + // One day before DST ends. + time.Date(2000, 10, 28, 0, 0, 0, 0, LosAngeles), + } { + ts := start + // Write every hour for 4 days. + for i := 0; i < 24*4; i++ { + writes = append(writes, fmt.Sprintf(`cpu,interval=daily value=0 %d`, ts.UnixNano())) + ts = ts.Add(time.Hour) + } + + // Write every 5 minutes for 3 hours. Start at 1 on the day with DST. + ts = start.Add(25 * time.Hour) + for i := 0; i < 12*3; i++ { + writes = append(writes, fmt.Sprintf(`cpu,interval=hourly value=0 %d`, ts.UnixNano())) + ts = ts.Add(5 * time.Minute) + } + } + + test := NewTest("db0", "rp0") + test.writes = Writes{ + &Write{data: strings.Join(writes, "\n")}, + } + + test.addQueries([]*Query{ + &Query{ + name: "timezone offset - dst start - daily", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-04-02T00:00:00-08:00' AND time < '2000-04-04T00:00:00-07:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-04-02T00:00:00-08:00",23],["2000-04-03T00:00:00-07:00",24]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - no change - daily", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-06-01T00:00:00-07:00' AND time < '2000-06-03T00:00:00-07:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-06-01T00:00:00-07:00",24],["2000-06-02T00:00:00-07:00",24]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - dst end - daily", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-10-29T00:00:00-07:00' AND time < '2000-10-31T00:00:00-08:00' AND interval = 'daily' GROUP BY time(1d) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-10-29T00:00:00-07:00",25],["2000-10-30T00:00:00-08:00",24]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - dst start - hourly", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-04-02T01:00:00-08:00' AND time < '2000-04-02T04:00:00-07:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-04-02T01:00:00-08:00",12],["2000-04-02T03:00:00-07:00",12]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - no change - hourly", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-06-02T01:00:00-07:00' AND time < '2000-06-02T03:00:00-07:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-06-02T01:00:00-07:00",12],["2000-06-02T02:00:00-07:00",12]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + &Query{ + name: "timezone offset - dst end - hourly", + command: `SELECT count(value) FROM cpu WHERE time >= '2000-10-29T01:00:00-07:00' AND time < '2000-10-29T02:00:00-08:00' AND interval = 'hourly' GROUP BY time(1h) TZ("America/Los_Angeles")`, + exp: `{"results":[{"statement_id":0,"series":[{"name":"cpu","columns":["time","count"],"values":[["2000-10-29T01:00:00-07:00",12],["2000-10-29T01:00:00-08:00",12]]}]}]}`, + params: url.Values{"db": []string{"db0"}}, + }, + }...) + + for i, query := range test.queries { + if i == 0 { + if err := test.init(s); err != nil { + t.Fatalf("test init failed: %s", err) + } + } + if query.skip { + t.Logf("SKIP:: %s", query.name) + continue + } + if err := query.Execute(s); err != nil { + t.Error(query.Error(err)) + } else if !query.success() { + t.Error(query.failureMessage()) + } + } +} + func TestServer_Query_Chunk(t *testing.T) { t.Parallel() s := OpenServer(NewConfig())