From c1457a9f7273d22274665cd3eca0831059d2d5e3 Mon Sep 17 00:00:00 2001 From: autom8ter Date: Wed, 30 Dec 2020 20:14:47 -0700 Subject: [PATCH] better raft --- .bumpversion.cfg | 2 ++ cmd/graphik/main.go | 35 +++++++++++++++++++++-------------- k8s.yaml | 3 +-- raft/options.go | 7 +++++++ raft/raft.go | 4 ++++ 5 files changed, 35 insertions(+), 16 deletions(-) diff --git a/.bumpversion.cfg b/.bumpversion.cfg index f800665..38a57d8 100644 --- a/.bumpversion.cfg +++ b/.bumpversion.cfg @@ -10,3 +10,5 @@ tag = False [bumpversion:file:README.md] [bumpversion:file:docker-compose.yml] + +[bumpversion:file:k8s.yaml] diff --git a/cmd/graphik/main.go b/cmd/graphik/main.go index 52d5810..69fb1be 100644 --- a/cmd/graphik/main.go +++ b/cmd/graphik/main.go @@ -85,6 +85,8 @@ func run(ctx context.Context, cfg *apipb.Flags) { lgger.Error("zero root users", zap.String("usage", pflag.CommandLine.Lookup("root-users").Usage)) return } + ctx, cancel := context.WithCancel(ctx) + defer cancel() var ( localRaftAddr = fmt.Sprintf("localhost:%v", global.ListenPort+1) adminLis net.Listener @@ -95,8 +97,7 @@ func run(ctx context.Context, cfg *apipb.Flags) { apiLis net.Listener advertise net.Addr ) - ctx, cancel := context.WithCancel(ctx) - defer cancel() + defer m.Close() signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM) defer signal.Stop(interrupt) if global.TlsCert != "" && global.TlsKey != "" { @@ -134,7 +135,7 @@ func run(ctx context.Context, cfg *apipb.Flags) { return } leaderIp := pods[leaderPod] - lgger.Info("registered k8s environment", + lgger.Debug("registered k8s environment", zap.String("namespace", namespace), zap.String("podip", podIp), zap.String("podname", podname), @@ -178,7 +179,7 @@ func run(ctx context.Context, cfg *apipb.Flags) { adminMux := cmux.New(adminLis) defer adminLis.Close() raftLis := adminMux.Match(cmux.Any()) - lgger.Info("starting raft listener", zap.String("address", raftLis.Addr().String())) + lgger.Info("starting raft server", zap.String("address", raftLis.Addr().String())) defer raftLis.Close() var metricServer *http.Server { @@ -233,6 +234,7 @@ func run(ctx context.Context, cfg *apipb.Flags) { raft.WithMaxPool(5), raft.WithRestoreSnapshotOnRestart(false), raft.WithTimeout(3 * time.Second), + raft.WithDebug(global.Debug), } if global.RaftAdvertise != "" { advertise, err = net.ResolveTCPAddr("tcp", global.RaftAdvertise) @@ -360,17 +362,19 @@ func run(ctx context.Context, cfg *apipb.Flags) { } }) - if global.JoinRaft != "" { - lgger.Info("joining raft cluster", - zap.String("joinAddr", global.JoinRaft), - zap.String("localAddr", localRaftAddr), - ) + m.Go(func(routine machine.Routine) { + if global.JoinRaft != "" { + lgger.Debug("joining raft cluster", + zap.String("joinAddr", global.JoinRaft), + zap.String("localAddr", localRaftAddr), + ) - if err := join(ctx, global.JoinRaft, localRaftAddr, g, lgger); err != nil { - lgger.Error("failed to join raft", zap.Error(err)) - cancel() + if err := join(ctx, global.JoinRaft, localRaftAddr, g, lgger); err != nil { + lgger.Error("failed to join raft", zap.Error(err)) + cancel() + } } - } + }) select { case <-interrupt: m.Cancel() @@ -400,7 +404,7 @@ func run(ctx context.Context, cfg *apipb.Flags) { } m.Wait() g.Close() - lgger.Info("shutdown successful") + lgger.Debug("shutdown successful") } func join(ctx context.Context, joinAddr, localAddr string, g *database.Graph, lgger *logger.Logger) error { @@ -412,6 +416,9 @@ func join(ctx context.Context, joinAddr, localAddr string, g *database.Graph, lg defer leaderConn.Close() rclient := apipb.NewRaftServiceClient(leaderConn) for x := 0; x < 30; x++ { + if ctx.Err() != nil { + return nil + } if err := pingTCP(localAddr); err != nil { lgger.Error("failed to join cluster - retrying", zap.Error(err), zap.Int("attempt", x+1), diff --git a/k8s.yaml b/k8s.yaml index 2fb4b2c..0748a0b 100644 --- a/k8s.yaml +++ b/k8s.yaml @@ -91,8 +91,7 @@ spec: valueFrom: fieldRef: fieldPath: status.podIP - #image: graphikdb/graphik:v1.0.0 - image: graphikdb/graphik:testing + image: graphikdb/graphik:1.1.0 imagePullPolicy: Always ports: - containerPort: 7821 diff --git a/raft/options.go b/raft/options.go index 772fb2a..b6bf6f6 100644 --- a/raft/options.go +++ b/raft/options.go @@ -19,6 +19,7 @@ type Options struct { heartbeatTimeout time.Duration commitTimeout time.Duration leaseTimeout time.Duration + debug bool } func (o *Options) setDefaults() { @@ -114,3 +115,9 @@ func WithCommitTimeout(timeout time.Duration) Opt { o.commitTimeout = timeout } } + +func WithDebug(debug bool) Opt { + return func(o *Options) { + o.debug = debug + } +} diff --git a/raft/raft.go b/raft/raft.go index 0614dce..1a3d88d 100644 --- a/raft/raft.go +++ b/raft/raft.go @@ -30,6 +30,10 @@ func NewRaft(fsm *fsm.FSM, lis net.Listener, opts ...Opt) (*Raft, error) { } options.setDefaults() config := raft.DefaultConfig() + config.LogLevel = "INFO" + if options.debug { + config.LogLevel = "DEBUG" + } config.NoSnapshotRestoreOnStart = !options.restoreSnapshotOnRestart config.LocalID = raft.ServerID(options.peerID) if options.leaseTimeout != 0 {