From bb30d44c7e1a218712714685a6c00b383bc23186 Mon Sep 17 00:00:00 2001 From: Max Englander Date: Fri, 21 Feb 2025 16:40:09 -0500 Subject: [PATCH] go/vt/discovery: configurable logger Signed-off-by: Max Englander --- go/vt/discovery/discovery_options.go | 67 ++++++++++++++++++++++++++ go/vt/discovery/healthcheck.go | 54 +++++++++++++-------- go/vt/discovery/tablet_health_check.go | 10 ++-- go/vt/discovery/topology_watcher.go | 40 +++++++++++---- 4 files changed, 139 insertions(+), 32 deletions(-) create mode 100644 go/vt/discovery/discovery_options.go diff --git a/go/vt/discovery/discovery_options.go b/go/vt/discovery/discovery_options.go new file mode 100644 index 00000000000..c5a2b1762d3 --- /dev/null +++ b/go/vt/discovery/discovery_options.go @@ -0,0 +1,67 @@ +/* +Copyright 2025 The Vitess Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package discovery + +import ( + "vitess.io/vitess/go/vt/logutil" +) + +// discoveryOptions configure a discovery components. discoveryOptions are set +// by the DiscoveryOption values passed to the component constructors. +type discoveryOptions struct { + logger logutil.Logger +} + +// DiscoveryOption configures how we perform certain operations. +type DiscoveryOption interface { + apply(*discoveryOptions) +} + +// funcDiscoveryOption wraps a function that modifies discoveryOptions into +// an implementation of the DiscoveryOption interface. +type funcDiscoveryOption struct { + f func(*discoveryOptions) +} + +func defaultOptions() discoveryOptions { + return discoveryOptions{ + logger: logutil.NewConsoleLogger(), + } +} + +func withOptions(dos ...DiscoveryOption) discoveryOptions { + os := defaultOptions() + for _, do := range dos { + do.apply(&os) + } + return os +} + +func (fhco *funcDiscoveryOption) apply(dos *discoveryOptions) { + fhco.f(dos) +} + +func newFuncDiscoveryOption(f func(*discoveryOptions)) *funcDiscoveryOption { + return &funcDiscoveryOption{ + f: f, + } +} + +// WithLogger accepts a custom logger to use in a discovery component. If this +// option is not provided then the default system logger will be used. +func WithLogger(l logutil.Logger) DiscoveryOption { + return newFuncDiscoveryOption(func(o *discoveryOptions) { + o.logger = l + }) +} diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 5734749b167..55e49a6bc64 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -52,6 +52,7 @@ import ( "vitess.io/vitess/go/netutil" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/proto/query" "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vtrpc" @@ -188,8 +189,10 @@ func FilteringKeyspaces() bool { return len(KeyspacesToWatch) > 0 } -type KeyspaceShardTabletType string -type tabletAliasString string +type ( + KeyspaceShardTabletType string + tabletAliasString string +) // HealthCheck declares what the TabletGateway needs from the HealthCheck type HealthCheck interface { @@ -299,6 +302,9 @@ type HealthCheckImpl struct { subscribers map[chan *TabletHealth]struct{} // loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted loadTabletsTrigger chan struct{} + // options contains optional settings used to modify HealthCheckImpl + // behavior. + options discoveryOptions } // NewVTGateHealthCheckFilters returns healthcheck filters for vtgate. @@ -350,9 +356,9 @@ func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) { // filters. // // Is one or more filters to apply when determining what tablets we want to stream healthchecks from. -func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl { - log.Infof("loading tablets for cells: %v", cellsToWatch) - +func NewHealthCheck( + ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter, opts ...DiscoveryOption, +) *HealthCheckImpl { hc := &HealthCheckImpl{ ts: topoServer, cell: localCell, @@ -364,7 +370,11 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur subscribers: make(map[chan *TabletHealth]struct{}), cellAliases: make(map[string]string), loadTabletsTrigger: make(chan struct{}, 1), + options: withOptions(opts...), } + + hc.logger().Infof("loading tablets for cells: %v", cellsToWatch) + var topoWatchers []*TopologyWatcher cells := strings.Split(cellsToWatch, ",") if cellsToWatch == "" { @@ -372,11 +382,11 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur } for _, c := range cells { - log.Infof("Setting up healthcheck for cell: %v", c) + hc.logger().Infof("Setting up healthcheck for cell: %v", c) if c == "" { continue } - topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets)) + topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, opts...)) } hc.topoWatchers = topoWatchers @@ -401,7 +411,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { return } - log.Infof("Adding tablet to healthcheck: %v", tablet) + hc.logger().Infof("Adding tablet to healthcheck: %v", tablet) hc.mu.Lock() defer hc.mu.Unlock() if hc.healthByAlias == nil { @@ -419,6 +429,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { cancelFunc: cancelFunc, Tablet: tablet, Target: target, + logger: hc.logger(), } // add to our datastore @@ -426,7 +437,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) { tabletAlias := topoproto.TabletAliasString(tablet.Alias) if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok { // We should not add a tablet that we already have - log.Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias) + hc.logger().Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias) return } hc.healthByAlias[tabletAliasString(tabletAlias)] = thc @@ -454,7 +465,7 @@ func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) { } func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { - log.Infof("Removing tablet from healthcheck: %v", tablet) + hc.logger().Infof("Removing tablet from healthcheck: %v", tablet) hc.mu.Lock() defer hc.mu.Unlock() @@ -497,7 +508,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) { // delete from authoritative map th, ok := hc.healthByAlias[tabletAlias] if !ok { - log.Infof("We have no health data for tablet: %v, it might have been deleted already", tablet) + hc.logger().Infof("We have no health data for tablet: %v, it might have been deleted already", tablet) return } // Calling this will end the context associated with th.checkConn, @@ -516,7 +527,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // so that we're not racing to update it and in effect re-adding a copy of the // tablet record that was deleted if _, ok := hc.healthByAlias[tabletAlias]; !ok { - log.Infof("Tablet %v has been deleted, skipping health update", th.Tablet) + hc.logger().Infof("Tablet %v has been deleted, skipping health update", th.Tablet) return } @@ -541,7 +552,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // causing an interruption where no primary is assigned to the shard. if prevTarget.TabletType == topodata.TabletType_PRIMARY { if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 { - log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet)) + hc.logger().Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet)) // We want to trigger a loadTablets call, but if the channel is not empty // then a trigger is already scheduled, we don't need to trigger another one. // This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994. @@ -567,7 +578,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ // We already have one up server, see if we // need to replace it. if th.PrimaryTermStartTime < hc.healthy[targetKey][0].PrimaryTermStartTime { - log.Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ", + hc.logger().Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ", topoproto.TabletAliasString(th.Tablet.Alias), topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard), topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias), @@ -604,7 +615,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_PRIMARY if isNewPrimary { - log.Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) + hc.logger().Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType) hcPrimaryPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1) } @@ -656,7 +667,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { default: // If the channel is full, we drop the message. hcChannelFullCounter.Add(1) - log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) + hc.logger().Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) } } } @@ -837,7 +848,7 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query. timer.Stop() for _, target := range targets { if target != nil { - log.Infof("couldn't find tablets for target: %v", target) + hc.logger().Infof("couldn't find tablets for target: %v", target) } } return ctx.Err() @@ -966,7 +977,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) { if err != nil { // Error logged if _, err := w.Write([]byte(err.Error())); err != nil { - log.Errorf("write to buffer error failed: %v", err) + hc.logger().Errorf("write to buffer error failed: %v", err) } return @@ -977,7 +988,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) { // Error logged if _, err := w.Write(buf.Bytes()); err != nil { - log.Errorf("write to buffer bytes failed: %v", err) + hc.logger().Errorf("write to buffer bytes failed: %v", err) } } @@ -1018,6 +1029,11 @@ func (hc *HealthCheckImpl) stateChecksum() int64 { return int64(crc32.ChecksumIEEE(buf.Bytes())) } +// logger returns the logutil.Logger used by the healthcheck. +func (hc *HealthCheckImpl) logger() logutil.Logger { + return hc.options.logger +} + // TabletToMapKey creates a key to the map from tablet's host and ports. // It should only be used in discovery and related module. func TabletToMapKey(tablet *topodata.Tablet) string { diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index ecadeefdf78..4c6e569cfc5 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -25,7 +25,7 @@ import ( "time" "vitess.io/vitess/go/vt/grpcclient" - "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/topo/topoproto" "vitess.io/vitess/go/vt/topotools" @@ -71,6 +71,8 @@ type tabletHealthCheck struct { // possibly delete both these loggedServingState bool lastResponseTimestamp time.Time // timestamp of the last healthcheck response + // logger is used to log messages. + logger logutil.Logger } // String is defined because we want to print a []*tabletHealthCheck array nicely. @@ -107,7 +109,7 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) { if !thc.loggedServingState || (serving != thc.Serving) { // Emit the log from a separate goroutine to avoid holding // the th lock while logging is happening - log.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s", + thc.logger.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s", topotools.TabletIdent(thc.Tablet), thc.Serving, serving, @@ -294,7 +296,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { // the healthcheck cache again via the topology watcher. // WARNING: Under no other circumstances should we be deleting the tablet here. if strings.Contains(err.Error(), "health stats mismatch") { - log.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet) + thc.logger.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet) hc.deleteTablet(thc.Tablet) return } @@ -331,7 +333,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { } func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) { - log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err) + thc.logger.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err) thc.setServingState(false, err.Error()) thc.LastError = err _ = thc.Conn.Close(ctx) diff --git a/go/vt/discovery/topology_watcher.go b/go/vt/discovery/topology_watcher.go index d1e358e1aa5..279cfb9146f 100644 --- a/go/vt/discovery/topology_watcher.go +++ b/go/vt/discovery/topology_watcher.go @@ -29,7 +29,7 @@ import ( "vitess.io/vitess/go/stats" "vitess.io/vitess/go/trace" "vitess.io/vitess/go/vt/key" - "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/logutil" topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -84,11 +84,16 @@ type TopologyWatcher struct { firstLoadDone bool // firstLoadChan is closed when the initial load of topology data is complete. firstLoadChan chan struct{} + // options contains optional settings used to modify HealthCheckImpl + // behavior. + options discoveryOptions } // NewTopologyWatcher returns a TopologyWatcher that monitors all // the tablets in a cell, and reloads them as needed. -func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool) *TopologyWatcher { +func NewTopologyWatcher( + ctx context.Context, topoServer *topo.Server, hc HealthCheck, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, opts ...DiscoveryOption, +) *TopologyWatcher { tw := &TopologyWatcher{ topoServer: topoServer, healthcheck: hc, @@ -97,6 +102,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, hc HealthC refreshInterval: refreshInterval, refreshKnownTablets: refreshKnownTablets, tablets: make(map[string]*tabletInfo), + options: withOptions(opts...), } tw.firstLoadChan = make(chan struct{}) @@ -147,10 +153,10 @@ func (tw *TopologyWatcher) loadTablets() { topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1) // If we get a partial result error, we just log it and process the tablets that we did manage to fetch. if topo.IsErrType(err, topo.PartialResult) { - log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) + tw.logger().Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err) partialResult = true } else { // For all other errors, just return. - log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err) + tw.logger().Errorf("error getting tablets for cell: %v: %v", tw.cell, err) return } } @@ -243,7 +249,6 @@ func (tw *TopologyWatcher) loadTablets() { } tw.topoChecksum = crc32.ChecksumIEEE(buf.Bytes()) tw.lastRefresh = time.Now() - } // RefreshLag returns the time since the last refresh. @@ -262,6 +267,11 @@ func (tw *TopologyWatcher) TopoChecksum() uint32 { return tw.topoChecksum } +// logger returns the logutil.Logger used by the TopologyWatcher. +func (tw *TopologyWatcher) logger() logutil.Logger { + return tw.options.logger +} + // TabletFilter is an interface that can be given to a TopologyWatcher // to be applied as an additional filter on the list of tablets returned by its getTablets function. type TabletFilter interface { @@ -287,6 +297,9 @@ func (tf TabletFilters) IsIncluded(tablet *topodatapb.Tablet) bool { type FilterByShard struct { // filters is a map of keyspace to filters for shards filters map[string][]*filterShard + // options contains optional settings used to modify FilterByShard + // behavior. + options discoveryOptions } // filterShard describes a filter for a given shard or keyrange inside @@ -295,6 +308,7 @@ type filterShard struct { keyspace string shard string keyRange *topodatapb.KeyRange // only set if shard is also a KeyRange + options discoveryOptions } // NewFilterByShard creates a new FilterByShard for use by a @@ -302,7 +316,7 @@ type filterShard struct { // can either be a shard name, or a keyrange. All tablets that match // at least one keyspace|shard tuple will be forwarded by the // TopologyWatcher to its consumer. -func NewFilterByShard(filters []string) (*FilterByShard, error) { +func NewFilterByShard(filters []string, opts ...DiscoveryOption) (*FilterByShard, error) { m := make(map[string][]*filterShard) for _, filter := range filters { parts := strings.Split(filter, "|") @@ -333,16 +347,19 @@ func NewFilterByShard(filters []string) (*FilterByShard, error) { }) } - return &FilterByShard{ + fbs := &FilterByShard{ filters: m, - }, nil + options: withOptions(opts...), + } + + return fbs, nil } // IsIncluded returns true iff the tablet's keyspace and shard match what we have. func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { canonical, kr, err := topo.ValidateShardName(tablet.Shard) if err != nil { - log.Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) + fbs.logger().Errorf("Error parsing shard name %v, will ignore tablet: %v", tablet.Shard, err) return false } @@ -359,6 +376,11 @@ func (fbs *FilterByShard) IsIncluded(tablet *topodatapb.Tablet) bool { return false } +// logger returns the logutil.Logger used by the FilterByShard. +func (fbs *FilterByShard) logger() logutil.Logger { + return fbs.options.logger +} + // FilterByKeyspace is a filter that filters tablets by keyspace. type FilterByKeyspace struct { keyspaces map[string]bool