Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
Sets the current time as the advertised time in the returned metric when using grpc.
  • Loading branch information
jcooklin committed Aug 3, 2016
1 parent 8f40854 commit bdd2bc0
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 5 deletions.
4 changes: 4 additions & 0 deletions control/plugin/client/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,10 @@ func (g *grpcClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, er
return nil, errors.New(reply.Error)
}

for _, metric := range reply.Metrics {
metric.LastAdvertisedTime = common.ToTime(time.Now())
}

results := common.ToCoreMetrics(reply.Metrics)
return results, nil
}
Expand Down
14 changes: 11 additions & 3 deletions grpc/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ func ToMetric(co core.Metric) *Metric {
Nsec: int64(co.Timestamp().Nanosecond()),
},
LastAdvertisedTime: &Time{
Sec: co.LastAdvertisedTime().Unix(),
Nsec: int64(co.Timestamp().Nanosecond()),
Sec: time.Now().Unix(),
Nsec: int64(time.Now().Nanosecond()),
},
}
if co.Config() != nil {
Expand Down Expand Up @@ -127,12 +127,20 @@ func (m *metric) Unit() string { return m.unit }

// Convert common.Metric to core.Metric
func ToCoreMetric(mt *Metric) core.Metric {
var lastAdvertisedTime time.Time
// if the lastAdvertisedTime is not set we handle. -62135596800 represents the
// number of seconds from 0001-1970 and is the default value for time.Unix.
if mt.LastAdvertisedTime.Sec == int64(-62135596800) {
lastAdvertisedTime = time.Unix(time.Now().Unix(), int64(time.Now().Nanosecond()))
} else {
lastAdvertisedTime = time.Unix(mt.LastAdvertisedTime.Sec, mt.LastAdvertisedTime.Nsec)
}
ret := &metric{
namespace: ToCoreNamespace(mt.Namespace),
version: int(mt.Version),
tags: mt.Tags,
timeStamp: time.Unix(mt.Timestamp.Sec, mt.Timestamp.Nsec),
lastAdvertisedTime: time.Unix(mt.LastAdvertisedTime.Sec, mt.LastAdvertisedTime.Nsec),
lastAdvertisedTime: lastAdvertisedTime,
config: ConfigMapToConfig(mt.Config),
description: mt.Description,
unit: mt.Unit,
Expand Down
24 changes: 22 additions & 2 deletions scheduler/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,17 @@ func (p *processJob) Run() {
p.AddErrors(fmt.Errorf("unsupported metric type. {%v}", m))
}
}
enc.Encode(metrics)
err := enc.Encode(metrics)
if err != nil {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "processor",
"plugin-name": p.name,
"plugin-version": p.version,
"error": err,
}).Error("encoding error")
}
_, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
Expand Down Expand Up @@ -467,7 +477,17 @@ func (p *publisherJob) Run() {
panic(fmt.Sprintf("unsupported type %T", mt))
}
}
enc.Encode(metrics)
err := enc.Encode(metrics)
if err != nil {
log.WithFields(log.Fields{
"_module": "scheduler-job",
"block": "run",
"job-type": "publisher",
"plugin-name": p.name,
"plugin-version": p.version,
"error": err,
}).Error("encoding error")
}
errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID)
if errs != nil {
for _, e := range errs {
Expand Down

0 comments on commit bdd2bc0

Please sign in to comment.