Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove global logger from outputs and monitoring #16761

Merged
merged 10 commits into from
Mar 5, 2020
2 changes: 2 additions & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
- Python 3 is required now to run python tests and tools. {pull}14798[14798]
- The type `memqueue.Broker` is no longer exported; instead of `memqueue.NewBroker`, call `memqueue.NewQueue` (which provides the same public interface). {pull}16667[16667]
- `queue.Eventer` has been renamed to `queue.ACKListener` {pull}16691[16691]
- Require logger as first parameter for `outputs.transport.transport#ProxyDialer` and `outputs.elasticsearch.client#BulkReadItemStatus`. {pull}16761[16761]

- The `libbeat/outputs/transport` package has been moved to `libbeat/common/transport`. {pull}16734[16734]
- The `libbeat/outputs/tls.go` file has been removed. All exported symbols in that file (`libbeat/outputs.*`) are now available as `libbeat/common/tlscommon.*`. {pull}16734[16734]

Expand Down
3 changes: 2 additions & 1 deletion heartbeat/monitors/active/dialchain/socks5.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/elastic/beats/v7/heartbeat/look"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common/transport"
"github.com/elastic/beats/v7/libbeat/logp"
)

// SOCKS5Layer configures a SOCKS5 proxy layer in a DialerChain.
Expand All @@ -38,7 +39,7 @@ func SOCKS5Layer(config *transport.ProxyConfig) Layer {
return func(event *beat.Event, next transport.Dialer) (transport.Dialer, error) {
var timer timer

dialer, err := transport.ProxyDialer(config, startTimerAfterDial(&timer, next))
dialer, err := transport.ProxyDialer(logp.NewLogger("socks5Layer"), config, startTimerAfterDial(&timer, next))
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions libbeat/common/transport/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,12 @@ import (
"time"

"github.com/elastic/beats/v7/libbeat/common/transport/tlscommon"
"github.com/elastic/beats/v7/libbeat/logp"
"github.com/elastic/beats/v7/libbeat/testing"
)

type Client struct {
log *logp.Logger
dialer Dialer
network string
host string
Expand Down Expand Up @@ -75,6 +77,7 @@ func NewClientWithDialer(d Dialer, c Config, network, host string, defaultPort i
}

client := &Client{
log: logp.NewLogger(logSelector),
dialer: d,
network: network,
host: host,
Expand Down Expand Up @@ -112,7 +115,7 @@ func (c *Client) Close() error {
defer c.mutex.Unlock()

if c.conn != nil {
debugf("closing")
c.log.Debug("closing")
err := c.conn.Close()
c.conn = nil
return err
Expand Down Expand Up @@ -199,7 +202,7 @@ func (c *Client) SetWriteDeadline(t time.Time) error {

func (c *Client) handleError(err error) error {
if err != nil {
debugf("handle error: %v", err)
c.log.Debugf("handle error: %+v", err)

if nerr, ok := err.(net.Error); !(ok && (nerr.Temporary() || nerr.Timeout())) {
_ = c.Close()
Expand Down
6 changes: 3 additions & 3 deletions libbeat/common/transport/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *ProxyConfig) Validate() error {
return nil
}

func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
func ProxyDialer(log *logp.Logger, config *ProxyConfig, forward Dialer) (Dialer, error) {
if config == nil || config.URL == "" {
return forward, nil
}
Expand All @@ -67,7 +67,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
return nil, err
}

logp.Info("proxy host: '%s'", url.Host)
log.Infof("proxy host: '%s'", url.Host)
return DialerFunc(func(network, address string) (net.Conn, error) {
var err error
var addresses []string
Expand All @@ -80,7 +80,7 @@ func ProxyDialer(config *ProxyConfig, forward Dialer) (Dialer, error) {
if config.LocalResolve {
addresses, err = net.LookupHost(host)
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
log.Warnf(`DNS lookup failure "%s": %+v`, host, err)
return nil, err
}
} else {
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/transport/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestNetDialer(d testing.Driver, timeout time.Duration) Dialer {
d.Fatal("dns lookup", err)
d.Info("addresses", strings.Join(addresses, ", "))
if err != nil {
logp.Warn(`DNS lookup failure "%s": %v`, host, err)
logp.NewLogger(logSelector).Warnf(`DNS lookup failure "%s": %+v`, host, err)
return nil, err
}

Expand Down
27 changes: 16 additions & 11 deletions libbeat/common/transport/tlscommon/tls.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/logp"
)

const logSelector = "tls"

// LoadCertificate will load a certificate from disk and return a tls.Certificate or error
func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) {
certificate := config.Certificate
Expand All @@ -46,31 +48,33 @@ func LoadCertificate(config *CertificateConfig) (*tls.Certificate, error) {
return nil, nil
}

certPEM, err := ReadPEMFile(certificate, config.Passphrase)
log := logp.NewLogger(logSelector)

certPEM, err := ReadPEMFile(log, certificate, config.Passphrase)
if err != nil {
logp.Critical("Failed reading certificate file %v: %+v", certificate, err)
log.Errorf("Failed reading certificate file %v: %+v", certificate, err)
return nil, fmt.Errorf("%v %v", err, certificate)
}

keyPEM, err := ReadPEMFile(key, config.Passphrase)
keyPEM, err := ReadPEMFile(log, key, config.Passphrase)
if err != nil {
logp.Critical("Failed reading key file %v: %+v", key, err)
log.Errorf("Failed reading key file %v: %+v", key, err)
return nil, fmt.Errorf("%v %v", err, key)
}

cert, err := tls.X509KeyPair(certPEM, keyPEM)
if err != nil {
logp.Critical("Failed loading client certificate %+v", err)
log.Errorf("Failed loading client certificate %+v", err)
return nil, err
}

logp.Debug("tls", "loading certificate: %v and key %v", certificate, key)
log.Debugf("tls", "loading certificate: %v and key %v", certificate, key)
return &cert, nil
}

// ReadPEMFile reads a PEM format file on disk and decrypt it with the privided password and
// return the raw content.
func ReadPEMFile(path, passphrase string) ([]byte, error) {
func ReadPEMFile(log *logp.Logger, path, passphrase string) ([]byte, error) {
pass := []byte(passphrase)
var blocks []*pem.Block

Expand Down Expand Up @@ -102,7 +106,7 @@ func ReadPEMFile(path, passphrase string) ([]byte, error) {
}

if err != nil {
logp.Err("Dropping encrypted pem '%v' block read from %v. %v",
log.Errorf("Dropping encrypted pem '%v' block read from %v. %+v",
block.Type, path, err)
continue
}
Expand Down Expand Up @@ -138,21 +142,22 @@ func LoadCertificateAuthorities(CAs []string) (*x509.CertPool, []error) {
return nil, nil
}

log := logp.NewLogger(logSelector)
roots := x509.NewCertPool()
for _, path := range CAs {
pemData, err := ioutil.ReadFile(path)
if err != nil {
logp.Critical("Failed reading CA certificate: %v", err)
log.Errorf("Failed reading CA certificate: %+v", err)
errors = append(errors, fmt.Errorf("%v reading %v", err, path))
continue
}

if ok := roots.AppendCertsFromPEM(pemData); !ok {
logp.Critical("Failed to add CA to the cert pool, CA is not a valid PEM file")
log.Error("Failed to add CA to the cert pool, CA is not a valid PEM file")
errors = append(errors, fmt.Errorf("%v adding %v to the list of known CAs", ErrNotACertificate, path))
continue
}
logp.Debug("tls", "successfully loaded CA certificate: %v", path)
log.Debugf("tls", "successfully loaded CA certificate: %v", path)
}

return roots, errors
Expand Down
2 changes: 1 addition & 1 deletion libbeat/common/transport/tlscommon/tls_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (c *TLSConfig) ToConfig() *tls.Config {
minVersion, maxVersion := extractMinMaxVersion(c.Versions)
insecure := c.Verification != VerifyFull
if insecure {
logp.Warn("SSL/TLS verifications disabled.")
logp.NewLogger("tls").Warn("SSL/TLS verifications disabled.")
}

// When we are usign the CAsha256 pin to validate the CA used to validate the chain
Expand Down
4 changes: 1 addition & 3 deletions libbeat/common/transport/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ type DialerFunc func(network, address string) (net.Conn, error)

var (
ErrNotConnected = errors.New("client is not connected")

debugf = logp.MakeDebug("transport")
)

func (d DialerFunc) Dial(network, address string) (net.Conn, error) {
Expand All @@ -51,7 +49,7 @@ func Dial(c Config, network, address string) (net.Conn, error) {
func MakeDialer(c Config) (Dialer, error) {
var err error
dialer := NetDialer(c.Timeout)
dialer, err = ProxyDialer(c.Proxy, dialer)
dialer, err = ProxyDialer(logp.NewLogger(logSelector), c.Proxy, dialer)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions libbeat/common/transport/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"strings"
)

const logSelector = "transport"

func fullAddress(host string, defaultPort int) string {
if _, _, err := net.SplitHostPort(host); err == nil {
return host
Expand Down
18 changes: 9 additions & 9 deletions libbeat/monitoring/adapter/go-metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
type GoMetricsRegistry struct {
mutex sync.Mutex

log *logp.Logger
reg *monitoring.Registry
filters *metricFilters

Expand All @@ -60,20 +61,19 @@ func GetGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFil
if v == nil {
return NewGoMetrics(parent, name, filters...)
}

reg := v.(*monitoring.Registry)
return &GoMetricsRegistry{
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
return newGoMetrics(v.(*monitoring.Registry), filters...)
}

// NewGoMetrics creates and registers a new GoMetricsRegistry with the parent
// registry.
func NewGoMetrics(parent *monitoring.Registry, name string, filters ...MetricFilter) *GoMetricsRegistry {
return newGoMetrics(parent.NewRegistry(name, monitoring.IgnorePublishExpvar), filters...)
}

func newGoMetrics(reg *monitoring.Registry, filters ...MetricFilter) *GoMetricsRegistry {
return &GoMetricsRegistry{
reg: parent.NewRegistry(name, monitoring.IgnorePublishExpvar),
log: logp.NewLogger("monitoring"),
reg: reg,
shadow: metrics.NewRegistry(),
filters: makeFilters(filters...),
}
Expand Down Expand Up @@ -193,7 +193,7 @@ func (r *GoMetricsRegistry) UnregisterAll() {
r.shadow.UnregisterAll()
err := r.reg.Clear()
if err != nil {
logp.Err("Failed to clear registry: %v", err)
r.log.Errorf("Failed to clear registry: %+v", err)
}
}

Expand Down
24 changes: 13 additions & 11 deletions libbeat/monitoring/report/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
var createDocPrivAvailableESVersion = common.MustNewVersion("7.5.0")

type publishClient struct {
log *logp.Logger
es *esout.Client
params map[string]string
format report.Format
Expand All @@ -47,6 +48,7 @@ func newPublishClient(
format report.Format,
) (*publishClient, error) {
p := &publishClient{
log: logp.NewLogger(selector),
es: es,
params: params,
format: format,
Expand All @@ -55,7 +57,7 @@ func newPublishClient(
}

func (c *publishClient) Connect() error {
debugf("Monitoring client: connect.")
c.log.Debug("Monitoring client: connect.")

err := c.es.Connect()
if err != nil {
Expand Down Expand Up @@ -86,11 +88,11 @@ func (c *publishClient) Connect() error {
}

if !resp.Features.Monitoring.Enabled {
debugf("XPack monitoring is disabled.")
c.log.Debug("XPack monitoring is disabled.")
return errNoMonitoring
}

debugf("XPack monitoring is enabled")
c.log.Debug("XPack monitoring is enabled")

return nil
}
Expand All @@ -108,13 +110,13 @@ func (c *publishClient) Publish(batch publisher.Batch) error {
// Extract type
t, err := event.Content.Meta.GetValue("type")
if err != nil {
logp.Err("Type not available in monitoring reported. Please report this error: %s", err)
c.log.Errorf("Type not available in monitoring reported. Please report this error: %+v", err)
continue
}

typ, ok := t.(string)
if !ok {
logp.Err("monitoring type is not a string")
c.log.Error("monitoring type is not a string")
}

var params = map[string]string{}
Expand Down Expand Up @@ -235,7 +237,7 @@ func (c *publishClient) publishBulk(event publisher.Event, typ string) error {
return err
}

logBulkFailures(result, []report.Event{document})
logBulkFailures(c.log, result, []report.Event{document})
return err
}

Expand All @@ -245,25 +247,25 @@ func getMonitoringIndexName() string {
return fmt.Sprintf(".monitoring-beats-%v-%s", version, date)
}

func logBulkFailures(result esout.BulkResult, events []report.Event) {
func logBulkFailures(log *logp.Logger, result esout.BulkResult, events []report.Event) {
reader := esout.NewJSONReader(result)
err := esout.BulkReadToItems(reader)
if err != nil {
logp.Err("failed to parse monitoring bulk items: %v", err)
log.Errorf("failed to parse monitoring bulk items: %+v", err)
return
}

for i := range events {
status, msg, err := esout.BulkReadItemStatus(reader)
status, msg, err := esout.BulkReadItemStatus(log, reader)
if err != nil {
logp.Err("failed to parse monitoring bulk item status: %v", err)
log.Errorf("failed to parse monitoring bulk item status: %+v", err)
return
}
switch {
case status < 300, status == http.StatusConflict:
continue
default:
logp.Warn("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
log.Warnf("monitoring bulk item insert failed (i=%v, status=%v): %s", i, status, msg)
}
}
}
Loading