Skip to content

Commit

Permalink
reduce memory allocations in index
Browse files Browse the repository at this point in the history
This commit changes the index to point to index data in the shards
instead of keeping it in-memory on the heap.
  • Loading branch information
benbjohnson committed Aug 16, 2016
1 parent 35f2fda commit 8aa224b
Show file tree
Hide file tree
Showing 38 changed files with 665 additions and 346 deletions.
2 changes: 1 addition & 1 deletion client/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ func (p *Point) MarshalJSON() ([]byte, error) {
// MarshalString renders string representation of a Point with specified
// precision. The default precision is nanoseconds.
func (p *Point) MarshalString() string {
pt, err := models.NewPoint(p.Measurement, p.Tags, p.Fields, p.Time)
pt, err := models.NewPoint(p.Measurement, models.NewTags(p.Tags), p.Fields, p.Time)
if err != nil {
return "# ERROR: " + err.Error() + " " + p.Measurement
}
Expand Down
4 changes: 2 additions & 2 deletions client/v2/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ func NewPoint(
T = t[0]
}

pt, err := models.NewPoint(name, tags, fields, T)
pt, err := models.NewPoint(name, models.NewTags(tags), fields, T)
if err != nil {
return nil, err
}
Expand All @@ -382,7 +382,7 @@ func (p *Point) Name() string {

// Tags returns the tags associated with the point
func (p *Point) Tags() map[string]string {
return p.pt.Tags()
return p.pt.Tags().Map()
}

// Time return the timestamp for the point
Expand Down
2 changes: 1 addition & 1 deletion cmd/influx_inspect/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (c *cmdExport) writeFiles() error {
for i := 0; i < reader.KeyCount(); i++ {
var pairs string
key, typ := reader.KeyAt(i)
values, _ := reader.ReadAll(key)
values, _ := reader.ReadAll(string(key))
measurement, field := tsm1.SeriesAndFieldFromCompositeKey(key)

for _, value := range values {
Expand Down
10 changes: 5 additions & 5 deletions cmd/influx_inspect/report.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func cmdReport(opts *reportOpts) {
totalSeries.Add([]byte(key))

if opts.detailed {
sep := strings.Index(key, "#!~#")
sep := strings.Index(string(key), "#!~#")
seriesKey, field := key[:sep], key[sep+4:]
measurement, tags, _ := models.ParseKey(seriesKey)

Expand All @@ -96,13 +96,13 @@ func cmdReport(opts *reportOpts) {
}
fieldCount.Add([]byte(field))

for t, v := range tags {
tagCount, ok := tagCardialities[t]
for _, t := range tags {
tagCount, ok := tagCardialities[string(t.Key)]
if !ok {
tagCount = hllpp.New()
tagCardialities[t] = tagCount
tagCardialities[string(t.Key)] = tagCount
}
tagCount.Add([]byte(v))
tagCount.Add(t.Value)
}
}
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/influx_inspect/tsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,9 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
var pos int
for i := 0; i < keyCount; i++ {
key, _ := r.KeyAt(i)
for _, e := range r.Entries(key) {
for _, e := range r.Entries(string(key)) {
pos++
split := strings.Split(key, "#!~#")
split := strings.Split(string(key), "#!~#")

// We dont' know know if we have fields so use an informative default
var measurement, field string = "UNKNOWN", "UNKNOWN"
Expand All @@ -132,7 +132,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
measurement = split[0]
field = split[1]

if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
continue
}
fmt.Fprintln(tw, " "+strings.Join([]string{
Expand Down Expand Up @@ -160,7 +160,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {
// Start at the beginning and read every block
for j := 0; j < keyCount; j++ {
key, _ := r.KeyAt(j)
for _, e := range r.Entries(key) {
for _, e := range r.Entries(string(key)) {

f.Seek(int64(e.Offset), 0)
f.Read(b[:4])
Expand All @@ -172,7 +172,7 @@ func cmdDumpTsm1(opts *tsdmDumpOpts) {

blockSize += int64(e.Size)

if opts.filterKey != "" && !strings.Contains(key, opts.filterKey) {
if opts.filterKey != "" && !strings.Contains(string(key), opts.filterKey) {
i += blockSize
blockCount++
continue
Expand Down
4 changes: 2 additions & 2 deletions coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type WritePointsRequest struct {
// AddPoint adds a point to the WritePointRequest with field key 'value'
func (w *WritePointsRequest) AddPoint(name string, value interface{}, timestamp time.Time, tags map[string]string) {
pt, err := models.NewPoint(
name, tags, map[string]interface{}{"value": value}, timestamp,
name, models.NewTags(tags), map[string]interface{}{"value": value}, timestamp,
)
if err != nil {
return
Expand Down Expand Up @@ -176,7 +176,7 @@ type WriteStatistics struct {
func (w *PointsWriter) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "write",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statWriteReq: atomic.LoadInt64(&w.stats.WriteReq),
statPointWriteReq: atomic.LoadInt64(&w.stats.PointWriteReq),
Expand Down
4 changes: 2 additions & 2 deletions coordinator/statement_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,7 @@ func (e *StatementExecutor) executeShowStatsStatement(stmt *influxql.ShowStatsSt
if stmt.Module != "" && stat.Name != stmt.Module {
continue
}
row := &models.Row{Name: stat.Name, Tags: stat.Tags}
row := &models.Row{Name: stat.Name, Tags: stat.Tags.Map()}

values := make([]interface{}, 0, len(stat.Values))
for _, k := range stat.ValueNames() {
Expand Down Expand Up @@ -1055,7 +1055,7 @@ func convertRowToPoints(measurementName string, row *models.Row) ([]models.Point
}
}

p, err := models.NewPoint(measurementName, row.Tags, vals, v[timeIndex].(time.Time))
p, err := models.NewPoint(measurementName, models.NewTags(row.Tags), vals, v[timeIndex].(time.Time))
if err != nil {
// Drop points that can't be stored
continue
Expand Down
2 changes: 1 addition & 1 deletion influxql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type QueryStatistics struct {
func (e *QueryExecutor) Statistics(tags map[string]string) []models.Statistic {
return []models.Statistic{{
Name: "queryExecutor",
Tags: tags,
Tags: models.NewTags(tags),
Values: map[string]interface{}{
statQueriesActive: atomic.LoadInt64(&e.stats.ActiveQueries),
statQueriesExecuted: atomic.LoadInt64(&e.stats.ExecutedQueries),
Expand Down
Loading

0 comments on commit 8aa224b

Please sign in to comment.