Skip to content

Commit

Permalink
go/vt/discovery: configurable logger
Browse files Browse the repository at this point in the history
Signed-off-by: Max Englander <max@planetscale.com>
  • Loading branch information
maxenglander committed Feb 21, 2025
1 parent ef84b32 commit bb30d44
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 32 deletions.
67 changes: 67 additions & 0 deletions go/vt/discovery/discovery_options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
Copyright 2025 The Vitess Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package discovery

import (
"vitess.io/vitess/go/vt/logutil"
)

// discoveryOptions configure a discovery components. discoveryOptions are set
// by the DiscoveryOption values passed to the component constructors.
type discoveryOptions struct {
logger logutil.Logger
}

// DiscoveryOption configures how we perform certain operations.
type DiscoveryOption interface {
apply(*discoveryOptions)
}

// funcDiscoveryOption wraps a function that modifies discoveryOptions into
// an implementation of the DiscoveryOption interface.
type funcDiscoveryOption struct {
f func(*discoveryOptions)
}

func defaultOptions() discoveryOptions {
return discoveryOptions{
logger: logutil.NewConsoleLogger(),
}
}

func withOptions(dos ...DiscoveryOption) discoveryOptions {
os := defaultOptions()
for _, do := range dos {
do.apply(&os)
}
return os
}

func (fhco *funcDiscoveryOption) apply(dos *discoveryOptions) {
fhco.f(dos)
}

func newFuncDiscoveryOption(f func(*discoveryOptions)) *funcDiscoveryOption {
return &funcDiscoveryOption{
f: f,
}
}

// WithLogger accepts a custom logger to use in a discovery component. If this
// option is not provided then the default system logger will be used.
func WithLogger(l logutil.Logger) DiscoveryOption {
return newFuncDiscoveryOption(func(o *discoveryOptions) {
o.logger = l
})
}
54 changes: 35 additions & 19 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/query"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
Expand Down Expand Up @@ -188,8 +189,10 @@ func FilteringKeyspaces() bool {
return len(KeyspacesToWatch) > 0
}

type KeyspaceShardTabletType string
type tabletAliasString string
type (
KeyspaceShardTabletType string
tabletAliasString string
)

// HealthCheck declares what the TabletGateway needs from the HealthCheck
type HealthCheck interface {
Expand Down Expand Up @@ -299,6 +302,9 @@ type HealthCheckImpl struct {
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// options contains optional settings used to modify HealthCheckImpl
// behavior.
options discoveryOptions
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -350,9 +356,9 @@ func NewVTGateHealthCheckFilters() (filters TabletFilters, err error) {
// filters.
//
// Is one or more filters to apply when determining what tablets we want to stream healthchecks from.
func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter) *HealthCheckImpl {
log.Infof("loading tablets for cells: %v", cellsToWatch)

func NewHealthCheck(
ctx context.Context, retryDelay, healthCheckTimeout time.Duration, topoServer *topo.Server, localCell, cellsToWatch string, filters TabletFilter, opts ...DiscoveryOption,
) *HealthCheckImpl {
hc := &HealthCheckImpl{
ts: topoServer,
cell: localCell,
Expand All @@ -364,19 +370,23 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
options: withOptions(opts...),
}

hc.logger().Infof("loading tablets for cells: %v", cellsToWatch)

var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
if cellsToWatch == "" {
cells = append(cells, localCell)
}

for _, c := range cells {
log.Infof("Setting up healthcheck for cell: %v", c)
hc.logger().Infof("Setting up healthcheck for cell: %v", c)
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, opts...))
}

hc.topoWatchers = topoWatchers
Expand All @@ -401,7 +411,7 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
return
}

log.Infof("Adding tablet to healthcheck: %v", tablet)
hc.logger().Infof("Adding tablet to healthcheck: %v", tablet)
hc.mu.Lock()
defer hc.mu.Unlock()
if hc.healthByAlias == nil {
Expand All @@ -419,14 +429,15 @@ func (hc *HealthCheckImpl) AddTablet(tablet *topodata.Tablet) {
cancelFunc: cancelFunc,
Tablet: tablet,
Target: target,
logger: hc.logger(),
}

// add to our datastore
key := KeyFromTarget(target)
tabletAlias := topoproto.TabletAliasString(tablet.Alias)
if _, ok := hc.healthByAlias[tabletAliasString(tabletAlias)]; ok {
// We should not add a tablet that we already have
log.Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias)
hc.logger().Errorf("Program bug: tried to add existing tablet: %v to healthcheck", tabletAlias)
return
}
hc.healthByAlias[tabletAliasString(tabletAlias)] = thc
Expand Down Expand Up @@ -454,7 +465,7 @@ func (hc *HealthCheckImpl) ReplaceTablet(old, new *topodata.Tablet) {
}

func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
log.Infof("Removing tablet from healthcheck: %v", tablet)
hc.logger().Infof("Removing tablet from healthcheck: %v", tablet)
hc.mu.Lock()
defer hc.mu.Unlock()

Expand Down Expand Up @@ -497,7 +508,7 @@ func (hc *HealthCheckImpl) deleteTablet(tablet *topodata.Tablet) {
// delete from authoritative map
th, ok := hc.healthByAlias[tabletAlias]
if !ok {
log.Infof("We have no health data for tablet: %v, it might have been deleted already", tablet)
hc.logger().Infof("We have no health data for tablet: %v, it might have been deleted already", tablet)
return
}
// Calling this will end the context associated with th.checkConn,
Expand All @@ -516,7 +527,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
// so that we're not racing to update it and in effect re-adding a copy of the
// tablet record that was deleted
if _, ok := hc.healthByAlias[tabletAlias]; !ok {
log.Infof("Tablet %v has been deleted, skipping health update", th.Tablet)
hc.logger().Infof("Tablet %v has been deleted, skipping health update", th.Tablet)
return
}

Expand All @@ -541,7 +552,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
// causing an interruption where no primary is assigned to the shard.
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
hc.logger().Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
Expand All @@ -567,7 +578,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
// We already have one up server, see if we
// need to replace it.
if th.PrimaryTermStartTime < hc.healthy[targetKey][0].PrimaryTermStartTime {
log.Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ",
hc.logger().Warningf("not marking healthy primary %s as Up for %s because its PrimaryTermStartTime is smaller than the highest known timestamp from previous PRIMARYs %s: %d < %d ",
topoproto.TabletAliasString(th.Tablet.Alias),
topoproto.KeyspaceShardString(th.Target.Keyspace, th.Target.Shard),
topoproto.TabletAliasString(hc.healthy[targetKey][0].Tablet.Alias),
Expand Down Expand Up @@ -604,7 +615,7 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ

isNewPrimary := isPrimary && prevTarget.TabletType != topodata.TabletType_PRIMARY
if isNewPrimary {
log.Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType)
hc.logger().Errorf("Adding 1 to PrimaryPromoted counter for target: %v, tablet: %v, tabletType: %v", prevTarget, topoproto.TabletAliasString(th.Tablet.Alias), th.Target.TabletType)
hcPrimaryPromotedCounters.Add([]string{th.Target.Keyspace, th.Target.Shard}, 1)
}

Expand Down Expand Up @@ -656,7 +667,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
default:
// If the channel is full, we drop the message.
hcChannelFullCounter.Add(1)
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
hc.logger().Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
}
}
}
Expand Down Expand Up @@ -837,7 +848,7 @@ func (hc *HealthCheckImpl) waitForTablets(ctx context.Context, targets []*query.
timer.Stop()
for _, target := range targets {
if target != nil {
log.Infof("couldn't find tablets for target: %v", target)
hc.logger().Infof("couldn't find tablets for target: %v", target)
}
}
return ctx.Err()
Expand Down Expand Up @@ -966,7 +977,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {
if err != nil {
// Error logged
if _, err := w.Write([]byte(err.Error())); err != nil {
log.Errorf("write to buffer error failed: %v", err)
hc.logger().Errorf("write to buffer error failed: %v", err)
}

return
Expand All @@ -977,7 +988,7 @@ func (hc *HealthCheckImpl) ServeHTTP(w http.ResponseWriter, _ *http.Request) {

// Error logged
if _, err := w.Write(buf.Bytes()); err != nil {
log.Errorf("write to buffer bytes failed: %v", err)
hc.logger().Errorf("write to buffer bytes failed: %v", err)
}
}

Expand Down Expand Up @@ -1018,6 +1029,11 @@ func (hc *HealthCheckImpl) stateChecksum() int64 {
return int64(crc32.ChecksumIEEE(buf.Bytes()))
}

// logger returns the logutil.Logger used by the healthcheck.
func (hc *HealthCheckImpl) logger() logutil.Logger {
return hc.options.logger
}

// TabletToMapKey creates a key to the map from tablet's host and ports.
// It should only be used in discovery and related module.
func TabletToMapKey(tablet *topodata.Tablet) string {
Expand Down
10 changes: 6 additions & 4 deletions go/vt/discovery/tablet_health_check.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"vitess.io/vitess/go/vt/grpcclient"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/logutil"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/topotools"
Expand Down Expand Up @@ -71,6 +71,8 @@ type tabletHealthCheck struct {
// possibly delete both these
loggedServingState bool
lastResponseTimestamp time.Time // timestamp of the last healthcheck response
// logger is used to log messages.
logger logutil.Logger
}

// String is defined because we want to print a []*tabletHealthCheck array nicely.
Expand Down Expand Up @@ -107,7 +109,7 @@ func (thc *tabletHealthCheck) setServingState(serving bool, reason string) {
if !thc.loggedServingState || (serving != thc.Serving) {
// Emit the log from a separate goroutine to avoid holding
// the th lock while logging is happening
log.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s",
thc.logger.Infof("HealthCheckUpdate(Serving State): tablet: %v serving %v => %v for %v/%v (%v) reason: %s",
topotools.TabletIdent(thc.Tablet),
thc.Serving,
serving,
Expand Down Expand Up @@ -294,7 +296,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
// the healthcheck cache again via the topology watcher.
// WARNING: Under no other circumstances should we be deleting the tablet here.
if strings.Contains(err.Error(), "health stats mismatch") {
log.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet)
thc.logger.Warningf("deleting tablet %v from healthcheck due to health stats mismatch", thc.Tablet)
hc.deleteTablet(thc.Tablet)
return
}
Expand Down Expand Up @@ -331,7 +333,7 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) {
}

func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) {
log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err)
thc.logger.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err)
thc.setServingState(false, err.Error())
thc.LastError = err
_ = thc.Conn.Close(ctx)
Expand Down
Loading

0 comments on commit bb30d44

Please sign in to comment.