diff --git a/control/plugin/client/grpc.go b/control/plugin/client/grpc.go index 03d09db4a..236c62896 100644 --- a/control/plugin/client/grpc.go +++ b/control/plugin/client/grpc.go @@ -288,6 +288,10 @@ func (g *grpcClient) GetMetricTypes(config plugin.ConfigType) ([]core.Metric, er return nil, errors.New(reply.Error) } + for _, metric := range reply.Metrics { + metric.LastAdvertisedTime = common.ToTime(time.Now()) + } + results := common.ToCoreMetrics(reply.Metrics) return results, nil } diff --git a/grpc/common/common.go b/grpc/common/common.go index a7a542740..2ba08915a 100644 --- a/grpc/common/common.go +++ b/grpc/common/common.go @@ -43,8 +43,8 @@ func ToMetric(co core.Metric) *Metric { Nsec: int64(co.Timestamp().Nanosecond()), }, LastAdvertisedTime: &Time{ - Sec: co.LastAdvertisedTime().Unix(), - Nsec: int64(co.Timestamp().Nanosecond()), + Sec: time.Now().Unix(), + Nsec: int64(time.Now().Nanosecond()), }, } if co.Config() != nil { @@ -127,12 +127,20 @@ func (m *metric) Unit() string { return m.unit } // Convert common.Metric to core.Metric func ToCoreMetric(mt *Metric) core.Metric { + var lastAdvertisedTime time.Time + // if the lastAdvertisedTime is not set we handle. -62135596800 represents the + // number of seconds from 0001-1970 and is the default value for time.Unix. + if mt.LastAdvertisedTime.Sec == int64(-62135596800) { + lastAdvertisedTime = time.Unix(time.Now().Unix(), int64(time.Now().Nanosecond())) + } else { + lastAdvertisedTime = time.Unix(mt.LastAdvertisedTime.Sec, mt.LastAdvertisedTime.Nsec) + } ret := &metric{ namespace: ToCoreNamespace(mt.Namespace), version: int(mt.Version), tags: mt.Tags, timeStamp: time.Unix(mt.Timestamp.Sec, mt.Timestamp.Nsec), - lastAdvertisedTime: time.Unix(mt.LastAdvertisedTime.Sec, mt.LastAdvertisedTime.Nsec), + lastAdvertisedTime: lastAdvertisedTime, config: ConfigMapToConfig(mt.Config), description: mt.Description, unit: mt.Unit, diff --git a/scheduler/job.go b/scheduler/job.go index 14566dca6..ce35a11cc 100644 --- a/scheduler/job.go +++ b/scheduler/job.go @@ -343,7 +343,17 @@ func (p *processJob) Run() { p.AddErrors(fmt.Errorf("unsupported metric type. {%v}", m)) } } - enc.Encode(metrics) + err := enc.Encode(metrics) + if err != nil { + log.WithFields(log.Fields{ + "_module": "scheduler-job", + "block": "run", + "job-type": "processor", + "plugin-name": p.name, + "plugin-version": p.version, + "error": err, + }).Error("encoding error") + } _, content, errs := p.processor.ProcessMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID) if errs != nil { for _, e := range errs { @@ -467,7 +477,17 @@ func (p *publisherJob) Run() { panic(fmt.Sprintf("unsupported type %T", mt)) } } - enc.Encode(metrics) + err := enc.Encode(metrics) + if err != nil { + log.WithFields(log.Fields{ + "_module": "scheduler-job", + "block": "run", + "job-type": "publisher", + "plugin-name": p.name, + "plugin-version": p.version, + "error": err, + }).Error("encoding error") + } errs := p.publisher.PublishMetrics(p.contentType, buf.Bytes(), p.name, p.version, p.config, p.taskID) if errs != nil { for _, e := range errs {