diff --git a/server/cluster.go b/server/cluster.go index 5e7a239a66f..8c89a5a2bb2 100644 --- a/server/cluster.go +++ b/server/cluster.go @@ -21,12 +21,13 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" errcode "github.com/pingcap/pd/pkg/error_code" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var backgroundJobInterval = time.Minute @@ -354,7 +355,9 @@ func (c *RaftCluster) putStore(store *metapb.Store) error { // Check location labels. for _, k := range c.cachedCluster.GetLocationLabels() { if v := s.GetLabelValue(k); len(v) == 0 { - log.Warnf("missing location label %q in store %v", k, s) + log.Warn("missing location label", + zap.Stringer("store", s.Store), + zap.String("label-key", k)) } } return cluster.putStore(s) @@ -384,7 +387,9 @@ func (c *RaftCluster) RemoveStore(storeID uint64) error { } store.State = metapb.StoreState_Offline - log.Warnf("[store %d] store %s has been Offline", store.GetId(), store.GetAddress()) + log.Warn("store has been offline", + zap.Uint64("store-id", store.GetId()), + zap.String("store-address", store.GetAddress())) return cluster.putStore(store) } @@ -412,11 +417,13 @@ func (c *RaftCluster) BuryStore(storeID uint64, force bool) error { // revive:di if !force { return errors.New("store is still up, please remove store gracefully") } - log.Warnf("forcedly bury store %v", store) + log.Warn("forcedly bury store", zap.Stringer("store", store.Store)) } store.State = metapb.StoreState_Tombstone - log.Warnf("[store %d] store %s has been Tombstone", store.GetId(), store.GetAddress()) + log.Warn("store has been Tombstone", + zap.Uint64("store-id", store.GetId()), + zap.String("store-address", store.GetAddress())) return cluster.putStore(store) } @@ -433,7 +440,9 @@ func (c *RaftCluster) SetStoreState(storeID uint64, state metapb.StoreState) err } store.State = state - log.Warnf("[store %d] set state to %v", storeID, state.String()) + log.Warn("store update state", + zap.Uint64("store-id", storeID), + zap.Stringer("new-state", state)) return cluster.putStore(store) } @@ -476,7 +485,9 @@ func (c *RaftCluster) checkStores() { // If the store is empty, it can be buried. if cluster.getStoreRegionCount(offlineStore.GetId()) == 0 { if err := c.BuryStore(offlineStore.GetId(), false); err != nil { - log.Errorf("bury store %v failed: %v", offlineStore, err) + log.Error("bury store failed", + zap.Stringer("store", offlineStore), + zap.Error(err)) } } else { offlineStores = append(offlineStores, offlineStore) @@ -489,7 +500,7 @@ func (c *RaftCluster) checkStores() { if upStoreCount < c.s.GetConfig().Replication.MaxReplicas { for _, offlineStore := range offlineStores { - log.Warnf("store %v may not turn into Tombstone, there are no extra up node has enough space to accommodate the extra replica", offlineStore) + log.Warn("store may not turn into Tombstone, there are no extra up node has enough space to accommodate the extra replica", zap.Stringer("store", offlineStore)) } } } @@ -500,13 +511,18 @@ func (c *RaftCluster) checkOperators() { // after region is merged, it will not heartbeat anymore // the operator of merged region will not timeout actively if c.cachedCluster.GetRegion(op.RegionID()) == nil { - log.Debugf("remove operator %v cause region %d is merged", op, op.RegionID()) + log.Debug("remove operator cause region is merged", + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("operator", op)) co.removeOperator(op) continue } if op.IsTimeout() { - log.Infof("[region %v] operator timeout: %s", op.RegionID(), op) + log.Info("operator timeout", + zap.Uint64("region-id", op.RegionID()), + zap.Stringer("operator", op)) + operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() co.removeOperator(op) } @@ -531,7 +547,7 @@ func (c *RaftCluster) collectHealthStatus() { client := c.s.GetClient() members, err := GetMembers(client) if err != nil { - log.Info("get members error:", err) + log.Error("get members error", zap.Error(err)) } unhealth := c.s.CheckHealth(members) for _, member := range members { diff --git a/server/cluster_info.go b/server/cluster_info.go index 7bf45a042a4..d39f5a4363a 100644 --- a/server/cluster_info.go +++ b/server/cluster_info.go @@ -21,10 +21,11 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) type clusterInfo struct { @@ -68,13 +69,19 @@ func loadClusterInfo(id core.IDAllocator, kv *core.KV, opt *scheduleOption) (*cl if err := kv.LoadStores(c.core.Stores); err != nil { return nil, err } - log.Infof("load %v stores cost %v", c.core.Stores.GetStoreCount(), time.Since(start)) + log.Info("load stores", + zap.Int("count", c.core.Stores.GetStoreCount()), + zap.Duration("cost", time.Since(start)), + ) start = time.Now() if err := kv.LoadRegions(c.core.Regions); err != nil { return nil, err } - log.Infof("load %v regions cost %v", c.core.Regions.GetRegionCount(), time.Since(start)) + log.Info("load regions", + zap.Int("count", c.core.Regions.GetRegionCount()), + zap.Duration("cost", time.Since(start)), + ) return c, nil } @@ -101,9 +108,11 @@ func (c *clusterInfo) OnStoreVersionChange() { c.opt.SetClusterVersion(*minVersion) err := c.opt.persist(c.kv) if err != nil { - log.Infof("persist cluster version meet error: %s", err) + log.Error("persist cluster version meet error", zap.Error(err)) } - log.Infof("cluster version changed from %s to %s", clusterVersion, minVersion) + log.Info("cluster version changed", + zap.Stringer("old-cluster-version", clusterVersion), + zap.Stringer("new-cluster-version", minVersion)) CheckPDVersion(c.opt) } } @@ -123,7 +132,7 @@ func (c *clusterInfo) allocID() (uint64, error) { func (c *clusterInfo) AllocPeer(storeID uint64) (*metapb.Peer, error) { peerID, err := c.allocID() if err != nil { - log.Errorf("failed to alloc peer: %v", err) + log.Error("failed to alloc peer", zap.Error(err)) return nil, err } peer := &metapb.Peer{ @@ -471,7 +480,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { // Mark isNew if the region in cache does not have leader. var saveKV, saveCache, isNew bool if origin == nil { - log.Debugf("[region %d] Insert new region {%v}", region.GetID(), core.HexRegionMeta(region.GetMeta())) + log.Debug("insert new region", + zap.Uint64("region-id", region.GetID()), + zap.Reflect("meta-region", core.HexRegionMeta(region.GetMeta())), + ) saveKV, saveCache, isNew = true, true, true } else { r := region.GetRegionEpoch() @@ -481,18 +493,32 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { return ErrRegionIsStale(region.GetMeta(), origin.GetMeta()) } if r.GetVersion() > o.GetVersion() { - log.Infof("[region %d] %s, Version changed from {%d} to {%d}", region.GetID(), core.DiffRegionKeyInfo(origin, region), o.GetVersion(), r.GetVersion()) + log.Info("region Version changed", + zap.Uint64("region-id", region.GetID()), + zap.String("detail", core.DiffRegionKeyInfo(origin, region)), + zap.Uint64("old-version", o.GetVersion()), + zap.Uint64("new-version", r.GetVersion()), + ) saveKV, saveCache = true, true } if r.GetConfVer() > o.GetConfVer() { - log.Infof("[region %d] %s, ConfVer changed from {%d} to {%d}", region.GetID(), core.DiffRegionPeersInfo(origin, region), o.GetConfVer(), r.GetConfVer()) + log.Info("region ConfVer changed", + zap.Uint64("region-id", region.GetID()), + zap.String("detail", core.DiffRegionPeersInfo(origin, region)), + zap.Uint64("old-confver", o.GetConfVer()), + zap.Uint64("new-confver", r.GetConfVer()), + ) saveKV, saveCache = true, true } if region.GetLeader().GetId() != origin.GetLeader().GetId() { if origin.GetLeader().GetId() == 0 { isNew = true } else { - log.Infof("[region %d] Leader changed from store {%d} to {%d}", region.GetID(), origin.GetLeader().GetStoreId(), region.GetLeader().GetStoreId()) + log.Info("leader changed", + zap.Uint64("region-id", region.GetID()), + zap.Uint64("from", origin.GetLeader().GetStoreId()), + zap.Uint64("to", region.GetLeader().GetStoreId()), + ) } saveCache = true } @@ -517,7 +543,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if err := c.kv.SaveRegion(region.GetMeta()); err != nil { // Not successfully saved to kv is not fatal, it only leads to longer warm-up // after restart. Here we only log the error then go on updating cache. - log.Errorf("[region %d] fail to save region %v: %v", region.GetID(), core.HexRegionMeta(region.GetMeta()), err) + log.Error("fail to save region to kv", + zap.Uint64("region-id", region.GetID()), + zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta())), + zap.Error(err)) } } if !isWriteUpdate && !isReadUpdate && !saveCache && !isNew { @@ -535,7 +564,10 @@ func (c *clusterInfo) handleRegionHeartbeat(region *core.RegionInfo) error { if c.kv != nil { for _, item := range overlaps { if err := c.kv.DeleteRegion(item); err != nil { - log.Errorf("[region %d] fail to delete region %v: %v", item.GetId(), core.HexRegionMeta(item), err) + log.Error("fail to delete region from kv", + zap.Uint64("region-id", item.GetId()), + zap.Reflect("region-meta", core.HexRegionMeta(item)), + zap.Error(err)) } } } diff --git a/server/cluster_worker.go b/server/cluster_worker.go index b42393198da..7d3622dab25 100644 --- a/server/cluster_worker.go +++ b/server/cluster_worker.go @@ -19,9 +19,10 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // HandleRegionHeartbeat processes RegionInfo reports from client. @@ -32,7 +33,7 @@ func (c *RaftCluster) HandleRegionHeartbeat(region *core.RegionInfo) error { // If the region peer count is 0, then we should not handle this. if len(region.GetPeers()) == 0 { - log.Warnf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta())) + log.Warn("invalid region, zero region peer count", zap.Reflect("region-meta", core.HexRegionMeta(region.GetMeta()))) return errors.Errorf("invalid region, zero region peer count: %v", core.HexRegionMeta(region.GetMeta())) } @@ -163,7 +164,10 @@ func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb err := c.checkSplitRegion(left, right) if err != nil { - log.Warnf("report split region is invalid: %v, %v, %v", core.HexRegionMeta(left), core.HexRegionMeta(right), err) + log.Warn("report split region is invalid", + zap.Reflect("left-region", core.HexRegionMeta(left)), + zap.Reflect("right-region", core.HexRegionMeta(right)), + zap.Error(err)) return nil, err } @@ -171,7 +175,9 @@ func (c *RaftCluster) handleReportSplit(request *pdpb.ReportSplitRequest) (*pdpb originRegion := proto.Clone(right).(*metapb.Region) originRegion.RegionEpoch = nil originRegion.StartKey = left.GetStartKey() - log.Infof("[region %d] region split, generate new region: %v", originRegion.GetId(), core.HexRegionMeta(left)) + log.Info("region split, generate new region", + zap.Uint64("region-id", originRegion.GetId()), + zap.Reflect("region-meta", core.HexRegionMeta(left))) return &pdpb.ReportSplitResponse{}, nil } @@ -184,11 +190,16 @@ func (c *RaftCluster) handleBatchReportSplit(request *pdpb.ReportBatchSplitReque err := c.checkSplitRegions(regions) if err != nil { - log.Warnf("report batch split region is invalid: %v, %v", hexRegionMetas, err) + log.Warn("report batch split region is invalid", + zap.Reflect("region-meta", hexRegionMetas), + zap.Error(err)) return nil, err } last := len(regions) - 1 originRegion := proto.Clone(regions[last]).(*metapb.Region) - log.Infof("[region %d] region split, generate %d new regions: %v", originRegion.GetId(), last, hexRegionMetas[:last]) + log.Info("region batch split, generate new regions", + zap.Uint64("region-id", originRegion.GetId()), + zap.Reflect("origin", hexRegionMetas[:last]), + zap.Int("total", last)) return &pdpb.ReportBatchSplitResponse{}, nil } diff --git a/server/coordinator.go b/server/coordinator.go index c9694b3432c..e35d1b06af8 100644 --- a/server/coordinator.go +++ b/server/coordinator.go @@ -23,13 +23,14 @@ import ( "github.com/pingcap/kvproto/pkg/eraftpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/cache" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/namespace" "github.com/pingcap/pd/server/schedule" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -101,14 +102,14 @@ func (c *coordinator) dispatch(region *core.RegionInfo) { return } if op.IsFinish() { - log.Infof("[region %v] operator finish: %s", region.GetID(), op) + log.Info("operator finish", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op)) operatorCounter.WithLabelValues(op.Desc(), "finish").Inc() operatorDuration.WithLabelValues(op.Desc()).Observe(op.ElapsedTime().Seconds()) c.pushHistory(op) c.opRecords.Put(op, pdpb.OperatorStatus_SUCCESS) c.removeOperator(op) } else if timeout { - log.Infof("[region %v] operator timeout: %s", region.GetID(), op) + log.Info("operator timeout", zap.Uint64("region-id", region.GetID()), zap.Reflect("operator", op)) operatorCounter.WithLabelValues(op.Desc(), "timeout").Inc() c.removeOperator(op) c.opRecords.Put(op, pdpb.OperatorStatus_TIMEOUT) @@ -232,16 +233,16 @@ func (c *coordinator) run() { if schedulerCfg.Disable { scheduleCfg.Schedulers[k] = schedulerCfg k++ - log.Info("skip create ", schedulerCfg.Type) + log.Info("skip create scheduler", zap.String("scheduler-type", schedulerCfg.Type)) continue } s, err := schedule.CreateScheduler(schedulerCfg.Type, c.limiter, schedulerCfg.Args...) if err != nil { - log.Fatalf("can not create scheduler %s: %v", schedulerCfg.Type, err) + log.Fatal("can not create scheduler", zap.String("scheduler-type", schedulerCfg.Type), zap.Error(err)) } - log.Infof("create scheduler %s", s.GetName()) + log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) if err = c.addScheduler(s, schedulerCfg.Args...); err != nil { - log.Errorf("can not add scheduler %s: %v", s.GetName(), err) + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) } // only record valid scheduler config @@ -254,7 +255,7 @@ func (c *coordinator) run() { // remove invalid scheduler config and persist scheduleCfg.Schedulers = scheduleCfg.Schedulers[:k] if err := c.cluster.opt.persist(c.cluster.kv); err != nil { - log.Errorf("can't persist schedule config: %v", err) + log.Error("cannot persist schedule config", zap.Error(err)) } c.wg.Add(1) @@ -439,7 +440,9 @@ func (c *coordinator) runScheduler(s *scheduleController) { } case <-s.Ctx().Done(): - log.Infof("%v has been stopped: %v", s.GetName(), s.Ctx().Err()) + log.Info("stopped scheduler", + zap.String("scheduler-name", s.GetName()), + zap.Error(s.Ctx().Err())) return } } @@ -448,12 +451,12 @@ func (c *coordinator) runScheduler(s *scheduleController) { func (c *coordinator) addOperatorLocked(op *schedule.Operator) bool { regionID := op.RegionID() - log.Infof("[region %v] add operator: %s", regionID, op) + log.Info("add operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", op)) // If there is an old operator, replace it. The priority should be checked // already. if old, ok := c.operators[regionID]; ok { - log.Infof("[region %v] replace old operator: %s", regionID, old) + log.Info("replace old operator", zap.Uint64("region-id", regionID), zap.Reflect("operator", old)) operatorCounter.WithLabelValues(old.Desc(), "replaced").Inc() c.opRecords.Put(old, pdpb.OperatorStatus_REPLACE) c.removeOperatorLocked(old) @@ -493,15 +496,15 @@ func (c *coordinator) addOperator(ops ...*schedule.Operator) bool { func (c *coordinator) checkAddOperator(op *schedule.Operator) bool { region := c.cluster.GetRegion(op.RegionID()) if region == nil { - log.Debugf("[region %v] region not found, cancel add operator", op.RegionID()) + log.Debug("region not found, cancel add operator", zap.Uint64("region-id", op.RegionID())) return false } if region.GetRegionEpoch().GetVersion() != op.RegionEpoch().GetVersion() || region.GetRegionEpoch().GetConfVer() != op.RegionEpoch().GetConfVer() { - log.Debugf("[region %v] region epoch not match, %v vs %v, cancel add operator", op.RegionID(), region.GetRegionEpoch(), op.RegionEpoch()) + log.Debug("region epoch not match, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", region.GetRegionEpoch()), zap.Reflect("new", op.RegionEpoch())) return false } if old := c.operators[op.RegionID()]; old != nil && !isHigherPriorityOperator(op, old) { - log.Debugf("[region %v] already have operator %s, cancel add operator", op.RegionID(), old) + log.Debug("already have operator, cancel add operator", zap.Uint64("region-id", op.RegionID()), zap.Reflect("old", old)) return false } return true @@ -576,7 +579,7 @@ func (c *coordinator) getHistory(start time.Time) []schedule.OperatorHistory { } func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule.OperatorStep) { - log.Infof("[region %v] send schedule command: %s", region.GetID(), step) + log.Info("send schedule command", zap.Uint64("region-id", region.GetID()), zap.Stringer("step", step)) switch s := step.(type) { case schedule.TransferLeader: cmd := &pdpb.RegionHeartbeatResponse{ @@ -654,7 +657,7 @@ func (c *coordinator) sendScheduleCommand(region *core.RegionInfo, step schedule } c.hbStreams.sendMsg(region, cmd) default: - log.Errorf("unknown operatorStep: %v", step) + log.Error("unknown operator step", zap.Reflect("step", step)) } } diff --git a/server/etcd_kv.go b/server/etcd_kv.go index 8553ec308b4..55a67baf027 100644 --- a/server/etcd_kv.go +++ b/server/etcd_kv.go @@ -19,8 +19,9 @@ import ( "time" "github.com/coreos/etcd/clientv3" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -83,7 +84,7 @@ func (kv *etcdKVBase) Save(key, value string) error { resp, err := kv.server.leaderTxn().Then(clientv3.OpPut(key, value)).Commit() if err != nil { - log.Errorf("save to etcd error: %v", err) + log.Error("save to etcd meet error", zap.Error(err)) return errors.WithStack(err) } if !resp.Succeeded { @@ -97,7 +98,7 @@ func (kv *etcdKVBase) Delete(key string) error { resp, err := kv.server.leaderTxn().Then(clientv3.OpDelete(key)).Commit() if err != nil { - log.Errorf("delete from etcd error: %v", err) + log.Error("delete from etcd meet error", zap.Error(err)) return errors.WithStack(err) } if !resp.Succeeded { @@ -113,10 +114,10 @@ func kvGet(c *clientv3.Client, key string, opts ...clientv3.OpOption) (*clientv3 start := time.Now() resp, err := clientv3.NewKV(c).Get(ctx, key, opts...) if err != nil { - log.Errorf("load from etcd error: %v", err) + log.Error("load from etcd meet error", zap.Error(err)) } if cost := time.Since(start); cost > kvSlowRequestTime { - log.Warnf("kv gets too slow: key %v cost %v err %v", key, cost, err) + log.Warn("kv gets too slow", zap.String("request-key", key), zap.Duration("cost", cost), zap.Error(err)) } return resp, errors.WithStack(err) diff --git a/server/grpc_service.go b/server/grpc_service.go index 669c2422252..95dc8a99d83 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -14,6 +14,7 @@ package server import ( + "context" "fmt" "io" "strconv" @@ -22,10 +23,10 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" - "golang.org/x/net/context" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) @@ -210,7 +211,9 @@ func (s *Server) PutStore(ctx context.Context, request *pdpb.PutStoreRequest) (* return nil, status.Errorf(codes.Unknown, err.Error()) } - log.Infof("put store ok - %v", store) + log.Info("put store ok", zap.Stringer("store", store)) + cluster.RLock() + defer cluster.RUnlock() cluster.cachedCluster.OnStoreVersionChange() return &pdpb.PutStoreResponse{ @@ -567,7 +570,7 @@ func (s *Server) PutClusterConfig(ctx context.Context, request *pdpb.PutClusterC return nil, status.Errorf(codes.Unknown, err.Error()) } - log.Infof("put cluster config ok - %v", conf) + log.Info("put cluster config ok", zap.Reflect("config", conf)) return &pdpb.PutClusterConfigResponse{ Header: s.header(), @@ -654,9 +657,12 @@ func (s *Server) UpdateGCSafePoint(ctx context.Context, request *pdpb.UpdateGCSa if err := s.kv.SaveGCSafePoint(newSafePoint); err != nil { return nil, err } - log.Infof("updated gc safe point to %d", newSafePoint) + log.Info("updated gc safe point", + zap.Uint64("safe-point", newSafePoint)) } else if newSafePoint < oldSafePoint { - log.Warnf("trying to update gc safe point from %d to %d", oldSafePoint, newSafePoint) + log.Warn("trying to update gc safe point", + zap.Uint64("old-safe-point", oldSafePoint), + zap.Uint64("new-safe-point", newSafePoint)) newSafePoint = oldSafePoint } diff --git a/server/handler.go b/server/handler.go index 244258bff82..d39ed77abd0 100644 --- a/server/handler.go +++ b/server/handler.go @@ -21,10 +21,11 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/server/core" "github.com/pingcap/pd/server/schedule" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) var ( @@ -159,11 +160,11 @@ func (h *Handler) AddScheduler(name string, args ...string) error { if err != nil { return err } - log.Infof("create scheduler %s", s.GetName()) + log.Info("create scheduler", zap.String("scheduler-name", s.GetName())) if err = c.addScheduler(s, args...); err != nil { - log.Errorf("can not add scheduler %v: %v", s.GetName(), err) + log.Error("can not add scheduler", zap.String("scheduler-name", s.GetName()), zap.Error(err)) } else if err = h.opt.persist(c.cluster.kv); err != nil { - log.Errorf("can not persist scheduler config: %v", err) + log.Error("can not persist scheduler config", zap.Error(err)) } return err } @@ -175,9 +176,9 @@ func (h *Handler) RemoveScheduler(name string) error { return err } if err = c.removeScheduler(name); err != nil { - log.Errorf("can not remove scheduler %v: %v", name, err) + log.Error("can not remove scheduler", zap.String("scheduler-name", name), zap.Error(err)) } else if err = h.opt.persist(c.cluster.kv); err != nil { - log.Errorf("can not persist scheduler config: %v", err) + log.Error("can not persist scheduler config", zap.Error(err)) } return err } diff --git a/server/heartbeat_stream_test.go b/server/heartbeat_stream_test.go index 7d0746ec4a2..febe3c1990e 100644 --- a/server/heartbeat_stream_test.go +++ b/server/heartbeat_stream_test.go @@ -20,8 +20,10 @@ import ( . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/testutil" "github.com/pingcap/pd/pkg/typeutil" + "go.uber.org/zap" ) var _ = Suite(&testHeartbeatStreamSuite{}) @@ -113,11 +115,15 @@ func newRegionheartbeatClient(c *C, grpcClient pdpb.PDClient) *regionHeartbeatCl } func (c *regionHeartbeatClient) close() { - c.stream.CloseSend() + if err := c.stream.CloseSend(); err != nil { + log.Error("failed to terminate client stream", zap.Error(err)) + } } func (c *regionHeartbeatClient) SendRecv(msg *pdpb.RegionHeartbeatRequest, timeout time.Duration) *pdpb.RegionHeartbeatResponse { - c.stream.Send(msg) + if err := c.stream.Send(msg); err != nil { + log.Error("send heartbeat message fail", zap.Error(err)) + } select { case <-time.After(timeout): return nil diff --git a/server/heartbeat_streams.go b/server/heartbeat_streams.go index 60e53db5c49..c224e620f78 100644 --- a/server/heartbeat_streams.go +++ b/server/heartbeat_streams.go @@ -20,9 +20,10 @@ import ( "time" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/logutil" "github.com/pingcap/pd/server/core" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const heartbeatStreamKeepAliveInterval = time.Minute @@ -80,21 +81,24 @@ func (s *heartbeatStreams) run() { storeLabel := strconv.FormatUint(storeID, 10) if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); err != nil { - log.Errorf("[region %v] send heartbeat message fail: %v", msg.RegionId, err) + log.Error("send heartbeat message fail", + zap.Uint64("region-id", msg.RegionId), zap.Error(err)) delete(s.streams, storeID) regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "err").Inc() } else { regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "ok").Inc() } } else { - log.Debugf("[region %v] heartbeat stream not found for store %v, skip send message", msg.RegionId, storeID) + log.Debug("heartbeat stream not found, skip send message", zap.Uint64("region-id", msg.RegionId), zap.Uint64("store-id", storeID)) regionHeartbeatCounter.WithLabelValues(storeLabel, "push", "skip").Inc() } case <-keepAliveTicker.C: for storeID, stream := range s.streams { storeLabel := strconv.FormatUint(storeID, 10) if err := stream.Send(keepAlive); err != nil { - log.Errorf("[store %v] send keepalive message fail: %v", storeID, err) + log.Error("send keepalive message fail", + zap.Uint64("target-store-id", storeID), + zap.Error(err)) delete(s.streams, storeID) regionHeartbeatCounter.WithLabelValues(storeLabel, "keepalive", "err").Inc() } else { diff --git a/server/id.go b/server/id.go index 17b3d3d8ec0..3c6f7b4ff17 100644 --- a/server/id.go +++ b/server/id.go @@ -17,8 +17,9 @@ import ( "sync" "github.com/coreos/etcd/clientv3" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -87,7 +88,7 @@ func (alloc *idAllocator) generate() (uint64, error) { return 0, errors.New("generate id failed, we may not leader") } - log.Infof("idAllocator allocates a new id: %d", end) + log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) metadataGauge.WithLabelValues("idalloc").Set(float64(end)) return end, nil } diff --git a/server/join.go b/server/join.go index 3de0d21c94d..57eebf4f141 100644 --- a/server/join.go +++ b/server/join.go @@ -22,9 +22,10 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/embed" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -86,7 +87,7 @@ func PrepareJoinCluster(cfg *Config) error { if _, err := os.Stat(filePath); !os.IsNotExist(err) { s, err := ioutil.ReadFile(filePath) if err != nil { - log.Fatal("read the join config meet error: ", err) + log.Fatal("read the join config meet error", zap.Error(err)) } cfg.InitialCluster = strings.TrimSpace(string(s)) cfg.InitialClusterState = embed.ClusterStateFlagExisting @@ -176,14 +177,14 @@ func PrepareJoinCluster(cfg *Config) error { func isDataExist(d string) bool { dir, err := os.Open(d) if err != nil { - log.Error("failed to open:", err) + log.Error("failed to open directory", zap.Error(err)) return false } defer dir.Close() names, err := dir.Readdirnames(-1) if err != nil { - log.Error("failed to list:", err) + log.Error("failed to list directory", zap.Error(err)) return false } return len(names) != 0 diff --git a/server/leader.go b/server/leader.go index 2f226a1474d..5fbd73d2499 100644 --- a/server/leader.go +++ b/server/leader.go @@ -15,7 +15,6 @@ package server import ( "context" - "fmt" "math/rand" "path" "strings" @@ -24,10 +23,11 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/mvcc/mvccpb" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pingcap/pd/pkg/etcdutil" "github.com/pingcap/pd/pkg/logutil" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // The timeout to wait transfer etcd leader to complete. @@ -75,7 +75,7 @@ func (s *Server) leaderLoop() { for { if s.isClosed() { - log.Infof("server is closed, return leader loop") + log.Info("server is closed, return leader loop") return } @@ -87,7 +87,7 @@ func (s *Server) leaderLoop() { leader, rev, err := getLeader(s.client, s.getLeaderPath()) if err != nil { - log.Errorf("get leader err %v", err) + log.Error("get leader meet error", zap.Error(err)) time.Sleep(200 * time.Millisecond) continue } @@ -95,14 +95,14 @@ func (s *Server) leaderLoop() { if s.isSameLeader(leader) { // oh, we are already leader, we may meet something wrong // in previous campaignLeader. we can delete and campaign again. - log.Warnf("leader is still %s, delete and campaign again", leader) + log.Warn("the leader has not changed, delete and campaign again", zap.Stringer("old-leader", leader)) if err = s.deleteLeaderKey(); err != nil { - log.Errorf("delete leader key err %s", err) + log.Error("delete leader key meet error", zap.Error(err)) time.Sleep(200 * time.Millisecond) continue } } else { - log.Infof("leader is %s, watch it", leader) + log.Info("start watch leader", zap.Stringer("leader", leader)) s.watchLeader(leader, rev) log.Info("leader changed, try to campaign leader") } @@ -110,13 +110,15 @@ func (s *Server) leaderLoop() { etcdLeader := s.GetEtcdLeader() if etcdLeader != s.ID() { - log.Infof("%v is not etcd leader, skip campaign leader and check later", s.Name()) + log.Info("skip campaign leader and check later", + zap.String("server-name", s.Name()), + zap.Uint64("etcd-leader-id", etcdLeader)) time.Sleep(200 * time.Millisecond) continue } if err = s.campaignLeader(); err != nil { - log.Errorf("campaign leader err %s", fmt.Sprintf("%+v", err)) + log.Error("campaign leader meet error", zap.Error(err)) } } } @@ -136,20 +138,22 @@ func (s *Server) etcdLeaderLoop() { } myPriority, err := s.GetMemberLeaderPriority(s.ID()) if err != nil { - log.Errorf("failed to load leader priority: %v", err) + log.Error("failed to load leader priority", zap.Error(err)) break } leaderPriority, err := s.GetMemberLeaderPriority(etcdLeader) if err != nil { - log.Errorf("failed to load leader priority: %v", err) + log.Error("failed to load leader priority", zap.Error(err)) break } if myPriority > leaderPriority { err := s.MoveEtcdLeader(ctx, etcdLeader, s.ID()) if err != nil { - log.Errorf("failed to transfer etcd leader: %v", err) + log.Error("failed to transfer etcd leader", zap.Error(err)) } else { - log.Infof("etcd leader moved from %v to %v", etcdLeader, s.ID()) + log.Info("transfer etcd leader", + zap.Uint64("from", etcdLeader), + zap.Uint64("to", s.ID())) } } case <-ctx.Done(): @@ -200,14 +204,14 @@ func (s *Server) memberInfo() (member *pdpb.Member, marshalStr string) { data, err := leader.Marshal() if err != nil { // can't fail, so panic here. - log.Fatalf("marshal leader %s err %v", leader, err) + log.Fatal("marshal leader meet error", zap.Stringer("leader", leader), zap.Error(err)) } return leader, string(data) } func (s *Server) campaignLeader() error { - log.Infof("start to campaign leader %s", s.Name()) + log.Info("begin to campaign leader", zap.String("campaign-leader-name", s.Name())) lessor := clientv3.NewLease(s.client) defer func() { @@ -221,7 +225,7 @@ func (s *Server) campaignLeader() error { cancel() if cost := time.Since(start); cost > slowRequestTime { - log.Warnf("lessor grants too slow, cost %s", cost) + log.Warn("lessor grants too slow", zap.Duration("cost", cost)) } if err != nil { @@ -249,7 +253,7 @@ func (s *Server) campaignLeader() error { if err != nil { return errors.WithStack(err) } - log.Infof("campaign leader ok %s", s.Name()) + log.Info("campaign leader ok", zap.String("campaign-leader-name", s.Name())) err = s.scheduleOpt.reload(s.kv) if err != nil { @@ -273,8 +277,8 @@ func (s *Server) campaignLeader() error { s.enableLeader() defer s.disableLeader() - log.Infof("cluster version is %s", s.scheduleOpt.loadClusterVersion()) - log.Infof("PD cluster leader %s is ready to serve", s.Name()) + log.Info("load cluster version", zap.Stringer("cluster-version", s.scheduleOpt.loadClusterVersion())) + log.Info("PD cluster leader is ready to serve", zap.String("leader-name", s.Name())) CheckPDVersion(s.scheduleOpt) tsTicker := time.NewTicker(updateTimestampStep) @@ -294,7 +298,7 @@ func (s *Server) campaignLeader() error { } etcdLeader := s.GetEtcdLeader() if etcdLeader != s.ID() { - log.Infof("etcd leader changed, %s resigns leadership", s.Name()) + log.Info("etcd leader changed, resigns leadership", zap.String("old-leader-name", s.Name())) return nil } case <-ctx.Done(): @@ -322,12 +326,14 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { for wresp := range rch { // meet compacted error, use the compact revision. if wresp.CompactRevision != 0 { - log.Warnf("required revision %d has been compacted, use the compact revision %d", revision, wresp.CompactRevision) + log.Warn("required revision has been compacted, use the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", wresp.CompactRevision)) revision = wresp.CompactRevision break } if wresp.Canceled { - log.Errorf("leader watcher is canceled with revision: %d, error: %s", revision, wresp.Err()) + log.Error("leader watcher is canceled with", zap.Int64("revision", revision), zap.Error(wresp.Err())) return } @@ -351,7 +357,7 @@ func (s *Server) watchLeader(leader *pdpb.Member, revision int64) { // ResignLeader resigns current PD's leadership. If nextLeader is empty, all // other pd-servers can campaign. func (s *Server) ResignLeader(nextLeader string) error { - log.Infof("%s tries to resign leader with next leader directive: %v", s.Name(), nextLeader) + log.Info("try to resign leader to next leader", zap.String("from", s.Name()), zap.String("to", nextLeader)) // Determine next leaders. var leaderIDs []uint64 res, err := etcdutil.ListEtcdMembers(s.client) @@ -367,7 +373,7 @@ func (s *Server) ResignLeader(nextLeader string) error { return errors.New("no valid pd to transfer leader") } nextLeaderID := leaderIDs[rand.Intn(len(leaderIDs))] - log.Infof("%s ready to resign leader, next leader: %v", s.Name(), nextLeaderID) + log.Info("ready to resign leader", zap.String("name", s.Name()), zap.Uint64("next-id", nextLeaderID)) err = s.MoveEtcdLeader(s.serverLoopCtx, s.ID(), nextLeaderID) return errors.WithStack(err) } diff --git a/server/systime_mon.go b/server/systime_mon.go index d1e8fd09085..f8664be8aac 100644 --- a/server/systime_mon.go +++ b/server/systime_mon.go @@ -16,7 +16,8 @@ package server import ( "time" - log "github.com/sirupsen/logrus" + log "github.com/pingcap/log" + "go.uber.org/zap" ) // StartMonitor calls systimeErrHandler if system time jump backward. @@ -28,7 +29,7 @@ func StartMonitor(now func() time.Time, systimeErrHandler func()) { last := now().UnixNano() <-tick.C if now().UnixNano() < last { - log.Errorf("system time jump backward, last:%v", last) + log.Error("system time jump backward", zap.Int64("last", last)) systimeErrHandler() } } diff --git a/server/tso.go b/server/tso.go index 73259bfdf37..f5f60658cb5 100644 --- a/server/tso.go +++ b/server/tso.go @@ -20,8 +20,9 @@ import ( "github.com/coreos/etcd/clientv3" "github.com/pingcap/kvproto/pkg/pdpb" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) const ( @@ -91,7 +92,7 @@ func (s *Server) syncTimestamp() error { // If the current system time minus the saved etcd timestamp is less than `updateTimestampGuard`, // the timestamp allocation will start from the saved etcd timestamp temporarily. if subTimeByWallClock(next, last) < updateTimestampGuard { - log.Errorf("system time may be incorrect: last: %v next %v", last, next) + log.Error("system time may be incorrect", zap.Time("last", last), zap.Time("next", next)) next = last.Add(updateTimestampGuard) } @@ -101,7 +102,7 @@ func (s *Server) syncTimestamp() error { } tsoCounter.WithLabelValues("sync_ok").Inc() - log.Infof("sync and save timestamp: last %v save %v next %v", last, save, next) + log.Info("sync and save timestamp", zap.Time("last", last), zap.Time("save", save), zap.Time("next", next)) current := &atomicObject{ physical: next, @@ -134,7 +135,7 @@ func (s *Server) updateTimestamp() error { jetLag := subTimeByWallClock(now, prev.physical) if jetLag > 3*updateTimestampStep { - log.Warnf("clock offset: %v, prev: %v, now: %v", jetLag, prev.physical, now) + log.Warn("clock offset", zap.Duration("jet-lag", jetLag), zap.Time("prev-physical", prev.physical), zap.Time("now", now)) tsoCounter.WithLabelValues("slow_save").Inc() } @@ -150,7 +151,7 @@ func (s *Server) updateTimestamp() error { } else if prevLogical > maxLogical/2 { // The reason choosing maxLogical/2 here is that it's big enough for common cases. // Because there is enough timestamp can be allocated before next update. - log.Warnf("the logical time may be not enough, prevLogical: %v", prevLogical) + log.Warn("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) next = prev.physical.Add(time.Millisecond) } else { // It will still use the previous physical time to alloc the timestamp. @@ -185,7 +186,7 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { for i := 0; i < maxRetryCount; i++ { current, ok := s.ts.Load().(*atomicObject) if !ok || current.physical == zeroTime { - log.Errorf("we haven't synced timestamp ok, wait and retry, retry count %d", i) + log.Error("we haven't synced timestamp ok, wait and retry", zap.Int("retry-count", i)) time.Sleep(200 * time.Millisecond) continue } @@ -193,7 +194,9 @@ func (s *Server) getRespTS(count uint32) (pdpb.Timestamp, error) { resp.Physical = current.physical.UnixNano() / int64(time.Millisecond) resp.Logical = atomic.AddInt64(¤t.logical, int64(count)) if resp.Logical >= maxLogical { - log.Errorf("logical part outside of max logical interval %v, please check ntp time, retry count %d", resp, i) + log.Error("logical part outside of max logical interval, please check ntp time", + zap.Reflect("response", resp), + zap.Int("retry-count", i)) tsoCounter.WithLabelValues("logical_overflow").Inc() time.Sleep(updateTimestampStep) continue diff --git a/server/version.go b/server/version.go index 6b853fb88aa..485b3c8647b 100644 --- a/server/version.go +++ b/server/version.go @@ -15,8 +15,9 @@ package server import ( "github.com/coreos/go-semver/semver" + log "github.com/pingcap/log" "github.com/pkg/errors" - log "github.com/sirupsen/logrus" + "go.uber.org/zap" ) // Feature supported features. @@ -53,7 +54,7 @@ var featuresDict = map[Feature]string{ func MinSupportedVersion(v Feature) semver.Version { target, ok := featuresDict[v] if !ok { - log.Fatalf("the corresponding version of the feature %d doesn't exist", v) + log.Fatal("the corresponding version of the feature doesn't exist", zap.Int("feature-number", int(v))) } version := MustParseVersion(target) return *version @@ -76,7 +77,7 @@ func ParseVersion(v string) (*semver.Version, error) { func MustParseVersion(v string) *semver.Version { ver, err := ParseVersion(v) if err != nil { - log.Fatalf("version string is illegal: %s", err) + log.Fatal("version string is illegal", zap.Error(err)) } return ver }