-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathmb.go
424 lines (357 loc) · 14.4 KB
/
mb.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/*
Package mb (short for Metricbeat) contains the public interfaces that are used
to implement Modules and their associated MetricSets.
*/
package mb
import (
"context"
"fmt"
"net/url"
"time"
"github.com/pkg/errors"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/monitoring"
"github.com/elastic/beats/v7/metricbeat/helper/dialer"
)
const (
// TimestampKey is the key used in events created by MetricSets to add their
// own timestamp to an event. If a timestamp is not specified then the that
// the fetch started will be used.
TimestampKey string = "@timestamp"
// ModuleDataKey is the key used in events created by MetricSets to add data
// to an event that is common to the module. The data must be a
// common.MapStr and when the final event is built the object will be stored
// in the event under a key that is the module name.
ModuleDataKey string = "_module"
// NamespaceKey is used to define a different namespace for the metricset
// This is useful for dynamic metricsets or metricsets which do not
// put the name under the same name as the package. This is for example
// the case in elasticsearch `node_stats` which puts the data under `node.stats`.
NamespaceKey string = "_namespace"
// RTTKey is used by a MetricSet to specify the round trip time (RTT), or
// total amount of time, taken to collect the information in the event. The
// data must be of type time.Duration otherwise the value is ignored.
RTTKey string = "_rtt"
)
// Module interfaces
// Module is the common interface for all Module implementations.
type Module interface {
Name() string // Name returns the name of the Module.
Config() ModuleConfig // Config returns the ModuleConfig used to create the Module.
UnpackConfig(to interface{}) error // UnpackConfig unpacks the raw module config to the given object.
}
// BaseModule implements the Module interface.
//
// When a Module needs to store additional data or provide methods to its
// MetricSets, it can embed this type into another struct to satisfy the
// Module interface requirements.
type BaseModule struct {
name string
config ModuleConfig
rawConfig *common.Config
}
func (m *BaseModule) String() string {
return fmt.Sprintf(`{name:"%v", config:%v}`, m.name, m.config.String())
}
func (m *BaseModule) GoString() string { return m.String() }
// Name returns the name of the Module.
func (m *BaseModule) Name() string { return m.name }
// Config returns the ModuleConfig used to create the Module.
func (m *BaseModule) Config() ModuleConfig { return m.config }
// UnpackConfig unpacks the raw module config to the given object.
func (m *BaseModule) UnpackConfig(to interface{}) error {
return m.rawConfig.Unpack(to)
}
// WithConfig re-configures the module with the given raw configuration and returns a
// copy of the module.
// Intended to be called from module factories. Note that if metricsets are specified
// in the new configuration, those metricsets must already be registered with
// mb.Registry.
func (m *BaseModule) WithConfig(config common.Config) (*BaseModule, error) {
var chkConfig struct {
Module string `config:"module"`
}
if err := config.Unpack(&chkConfig); err != nil {
return nil, errors.Wrap(err, "error parsing new module configuration")
}
// Don't allow module name change
if chkConfig.Module != "" && chkConfig.Module != m.name {
return nil, fmt.Errorf("cannot change module name from %v to %v", m.name, chkConfig.Module)
}
if err := config.SetString("module", -1, m.name); err != nil {
return nil, errors.Wrap(err, "unable to set existing module name in new configuration")
}
newBM := &BaseModule{
name: m.name,
rawConfig: &config,
}
if err := config.Unpack(&newBM.config); err != nil {
return nil, errors.Wrap(err, "error parsing new module configuration")
}
return newBM, nil
}
// MetricSet interfaces
// MetricSet is the common interface for all MetricSet implementations. In
// addition to this interface, all MetricSets must implement a fetcher interface.
type MetricSet interface {
ID() string // Unique ID identifying a running MetricSet.
Name() string // Name returns the name of the MetricSet.
Module() Module // Module returns the parent Module for the MetricSet.
Host() string // Host returns a hostname or other module specific value
// that identifies a specific host or service instance from which to collect
// metrics.
HostData() HostData // HostData returns the parsed host data.
Registration() MetricSetRegistration // Params used in registration.
Metrics() *monitoring.Registry // MetricSet specific metrics
Logger() *logp.Logger // MetricSet specific logger
}
// Closer is an optional interface that a MetricSet can implement in order to
// cleanup any resources it has open at shutdown.
type Closer interface {
Close() error
}
// Reporter is used by a MetricSet to report events, errors, or errors with
// metadata. The methods return false if and only if publishing failed because
// the MetricSet is being closed.
//
// Deprecated: Use ReporterV2.
type Reporter interface {
Event(event common.MapStr) bool // Event reports a single successful event.
ErrorWith(err error, meta common.MapStr) bool // ErrorWith reports a single error event with the additional metadata.
Error(err error) bool // Error reports a single error event.
}
// ReportingMetricSet is a MetricSet that reports events or errors through the
// Reporter interface. Fetch is called periodically to collect events.
//
// Deprecated: Use ReportingMetricSetV2.
type ReportingMetricSet interface {
MetricSet
Fetch(r Reporter)
}
// PushReporter is used by a MetricSet to report events, errors, or errors with
// metadata. It provides a done channel used to signal that reporter should
// stop.
//
// Deprecated: Use PushReporterV2.
type PushReporter interface {
Reporter
// Done returns a channel that's closed when work done on behalf of this
// reporter should be canceled.
Done() <-chan struct{}
}
// PushMetricSet is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the PushReporter's done channel is closed.
//
// Deprecated: Use PushMetricSetV2.
type PushMetricSet interface {
MetricSet
Run(r PushReporter)
}
// V2 Interfaces
// ReporterV2 is used by a MetricSet to report Events. The methods return false
// if and only if publishing failed because the MetricSet is being closed.
type ReporterV2 interface {
Event(event Event) bool // Event reports a single successful event.
Error(err error) bool
}
// PushReporterV2 is used by a MetricSet to report events, errors, or errors with
// metadata. It provides a done channel used to signal that reporter should
// stop.
type PushReporterV2 interface {
ReporterV2
// Done returns a channel that's closed when work done on behalf of this
// reporter should be canceled.
Done() <-chan struct{}
}
// ReportingMetricSetV2 is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2 interface {
MetricSet
Fetch(r ReporterV2)
}
// ReportingMetricSetV2Error is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2Error interface {
MetricSet
Fetch(r ReporterV2) error
}
// ReportingMetricSetV2WithContext is a MetricSet that reports events or errors through the
// ReporterV2 interface. Fetch is called periodically to collect events.
type ReportingMetricSetV2WithContext interface {
MetricSet
Fetch(ctx context.Context, r ReporterV2) error
}
// PushMetricSetV2 is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the PushReporterV2's done channel is closed.
type PushMetricSetV2 interface {
MetricSet
Run(r PushReporterV2)
}
// PushMetricSetV2WithContext is a MetricSet that pushes events (rather than pulling them
// periodically via a Fetch callback). Run is invoked to start the event
// subscription and it should block until the MetricSet is ready to stop or
// the context is closed.
type PushMetricSetV2WithContext interface {
MetricSet
Run(ctx context.Context, r ReporterV2)
}
// HostData contains values parsed from the 'host' configuration. Other
// configuration data like protocols, usernames, and passwords may also be
// used to construct this HostData data. HostData also contains information when combined scheme are
// used, like doing HTTP request over a UNIX socket.
//
type HostData struct {
Transport dialer.Builder // The transport builder to use when creating the connection.
URI string // The full URI that should be used in connections.
SanitizedURI string // A sanitized version of the URI without credentials.
// Parts of the URI.
Host string // The host and possibly port.
User string // Username
Password string // Password
}
func (h HostData) String() string {
return fmt.Sprintf(`{SanitizedURI:"%v", Host:"%v"}`, h.SanitizedURI, h.Host)
}
func (h HostData) GoString() string { return h.String() }
// BaseMetricSet implements the MetricSet interface.
//
// The BaseMetricSet type can be embedded into another struct to satisfy the
// MetricSet interface requirements, leaving only the Fetch() method to be
// implemented to have a complete MetricSet implementation.
type BaseMetricSet struct {
id string
name string
module Module
host string
hostData HostData
registration MetricSetRegistration
metrics *monitoring.Registry
logger *logp.Logger
}
func (b *BaseMetricSet) String() string {
moduleName := "nil"
if b.module != nil {
moduleName = b.module.Name()
}
return fmt.Sprintf(`{name:"%v", module:"%v", hostData:%v, registration:%v}`,
b.name, moduleName, b.hostData.String(), b.registration)
}
func (b *BaseMetricSet) GoString() string { return b.String() }
// ID returns the unique ID of the MetricSet.
func (b *BaseMetricSet) ID() string {
return b.id
}
// Metrics returns the metrics registry.
func (b *BaseMetricSet) Metrics() *monitoring.Registry {
return b.metrics
}
// Logger returns the logger.
func (b *BaseMetricSet) Logger() *logp.Logger {
return b.logger
}
// Name returns the name of the MetricSet. It should not include the name of
// the module.
func (b *BaseMetricSet) Name() string {
return b.name
}
// FullyQualifiedName returns the complete name of the MetricSet, including the
// name of the module.
func (b *BaseMetricSet) FullyQualifiedName() string {
return b.Module().Name() + "/" + b.Name()
}
// Module returns the parent Module for the MetricSet.
func (b *BaseMetricSet) Module() Module {
return b.module
}
// Host returns the hostname or other module specific value that identifies a
// specific host or service instance from which to collect metrics.
func (b *BaseMetricSet) Host() string {
return b.host
}
// HostData returns the parsed host data.
func (b *BaseMetricSet) HostData() HostData {
return b.hostData
}
// Registration returns the parameters that were used when the MetricSet was
// registered with the registry.
func (b *BaseMetricSet) Registration() MetricSetRegistration {
return b.registration
}
// Configuration types
// ModuleConfig is the base configuration data for all Modules.
//
// The Raw config option is used to enable raw fields in a metricset. This means
// the metricset fetches not only the predefined fields but add alls raw data under
// the raw namespace to the event.
type ModuleConfig struct {
Hosts []string `config:"hosts"`
Period time.Duration `config:"period" validate:"positive"`
Timeout time.Duration `config:"timeout" validate:"positive"`
Module string `config:"module" validate:"required"`
MetricSets []string `config:"metricsets"`
Enabled bool `config:"enabled"`
Raw bool `config:"raw"`
Query QueryParams `config:"query"`
ServiceName string `config:"service.name"`
}
func (c ModuleConfig) String() string {
return fmt.Sprintf(`{Module:"%v", MetricSets:%v, Enabled:%v, `+
`Hosts:[%v hosts], Period:"%v", Timeout:"%v", Raw:%v, Query:%v}`,
c.Module, c.MetricSets, c.Enabled, len(c.Hosts), c.Period, c.Timeout,
c.Raw, c.Query)
}
func (c ModuleConfig) GoString() string { return c.String() }
// QueryParams is a convenient map[string]interface{} wrapper to implement the String interface which returns the
// values in common query params format (key=value&key2=value2) which is the way that the url package expects this
// params (without the initial '?')
type QueryParams map[string]interface{}
// String returns the values in common query params format (key=value&key2=value2) which is the way that the url
// package expects this params (without the initial '?')
func (q QueryParams) String() (s string) {
u := url.Values{}
for k, v := range q {
if values, ok := v.([]interface{}); ok {
for _, innerValue := range values {
u.Add(k, fmt.Sprintf("%v", innerValue))
}
} else {
//nil values in YAML shouldn't be stringified anyhow
if v == nil {
u.Add(k, "")
} else {
u.Add(k, fmt.Sprintf("%v", v))
}
}
}
return u.Encode()
}
// defaultModuleConfig contains the default values for ModuleConfig instances.
var defaultModuleConfig = ModuleConfig{
Enabled: true,
Period: time.Second * 10,
}
// DefaultModuleConfig returns a ModuleConfig with the default values populated.
func DefaultModuleConfig() ModuleConfig {
return defaultModuleConfig
}