Skip to content

Commit

Permalink
better raft
Browse files Browse the repository at this point in the history
  • Loading branch information
autom8ter committed Dec 31, 2020
1 parent c90f6f3 commit c1457a9
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 16 deletions.
2 changes: 2 additions & 0 deletions .bumpversion.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,5 @@ tag = False
[bumpversion:file:README.md]

[bumpversion:file:docker-compose.yml]

[bumpversion:file:k8s.yaml]
35 changes: 21 additions & 14 deletions cmd/graphik/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func run(ctx context.Context, cfg *apipb.Flags) {
lgger.Error("zero root users", zap.String("usage", pflag.CommandLine.Lookup("root-users").Usage))
return
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var (
localRaftAddr = fmt.Sprintf("localhost:%v", global.ListenPort+1)
adminLis net.Listener
Expand All @@ -95,8 +97,7 @@ func run(ctx context.Context, cfg *apipb.Flags) {
apiLis net.Listener
advertise net.Addr
)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer m.Close()
signal.Notify(interrupt, os.Interrupt, syscall.SIGTERM)
defer signal.Stop(interrupt)
if global.TlsCert != "" && global.TlsKey != "" {
Expand Down Expand Up @@ -134,7 +135,7 @@ func run(ctx context.Context, cfg *apipb.Flags) {
return
}
leaderIp := pods[leaderPod]
lgger.Info("registered k8s environment",
lgger.Debug("registered k8s environment",
zap.String("namespace", namespace),
zap.String("podip", podIp),
zap.String("podname", podname),
Expand Down Expand Up @@ -178,7 +179,7 @@ func run(ctx context.Context, cfg *apipb.Flags) {
adminMux := cmux.New(adminLis)
defer adminLis.Close()
raftLis := adminMux.Match(cmux.Any())
lgger.Info("starting raft listener", zap.String("address", raftLis.Addr().String()))
lgger.Info("starting raft server", zap.String("address", raftLis.Addr().String()))
defer raftLis.Close()
var metricServer *http.Server
{
Expand Down Expand Up @@ -233,6 +234,7 @@ func run(ctx context.Context, cfg *apipb.Flags) {
raft.WithMaxPool(5),
raft.WithRestoreSnapshotOnRestart(false),
raft.WithTimeout(3 * time.Second),
raft.WithDebug(global.Debug),
}
if global.RaftAdvertise != "" {
advertise, err = net.ResolveTCPAddr("tcp", global.RaftAdvertise)
Expand Down Expand Up @@ -360,17 +362,19 @@ func run(ctx context.Context, cfg *apipb.Flags) {
}
})

if global.JoinRaft != "" {
lgger.Info("joining raft cluster",
zap.String("joinAddr", global.JoinRaft),
zap.String("localAddr", localRaftAddr),
)
m.Go(func(routine machine.Routine) {
if global.JoinRaft != "" {
lgger.Debug("joining raft cluster",
zap.String("joinAddr", global.JoinRaft),
zap.String("localAddr", localRaftAddr),
)

if err := join(ctx, global.JoinRaft, localRaftAddr, g, lgger); err != nil {
lgger.Error("failed to join raft", zap.Error(err))
cancel()
if err := join(ctx, global.JoinRaft, localRaftAddr, g, lgger); err != nil {
lgger.Error("failed to join raft", zap.Error(err))
cancel()
}
}
}
})
select {
case <-interrupt:
m.Cancel()
Expand Down Expand Up @@ -400,7 +404,7 @@ func run(ctx context.Context, cfg *apipb.Flags) {
}
m.Wait()
g.Close()
lgger.Info("shutdown successful")
lgger.Debug("shutdown successful")
}

func join(ctx context.Context, joinAddr, localAddr string, g *database.Graph, lgger *logger.Logger) error {
Expand All @@ -412,6 +416,9 @@ func join(ctx context.Context, joinAddr, localAddr string, g *database.Graph, lg
defer leaderConn.Close()
rclient := apipb.NewRaftServiceClient(leaderConn)
for x := 0; x < 30; x++ {
if ctx.Err() != nil {
return nil
}
if err := pingTCP(localAddr); err != nil {
lgger.Error("failed to join cluster - retrying", zap.Error(err),
zap.Int("attempt", x+1),
Expand Down
3 changes: 1 addition & 2 deletions k8s.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,7 @@ spec:
valueFrom:
fieldRef:
fieldPath: status.podIP
#image: graphikdb/graphik:v1.0.0
image: graphikdb/graphik:testing
image: graphikdb/graphik:1.1.0
imagePullPolicy: Always
ports:
- containerPort: 7821
Expand Down
7 changes: 7 additions & 0 deletions raft/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type Options struct {
heartbeatTimeout time.Duration
commitTimeout time.Duration
leaseTimeout time.Duration
debug bool
}

func (o *Options) setDefaults() {
Expand Down Expand Up @@ -114,3 +115,9 @@ func WithCommitTimeout(timeout time.Duration) Opt {
o.commitTimeout = timeout
}
}

func WithDebug(debug bool) Opt {
return func(o *Options) {
o.debug = debug
}
}
4 changes: 4 additions & 0 deletions raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ func NewRaft(fsm *fsm.FSM, lis net.Listener, opts ...Opt) (*Raft, error) {
}
options.setDefaults()
config := raft.DefaultConfig()
config.LogLevel = "INFO"
if options.debug {
config.LogLevel = "DEBUG"
}
config.NoSnapshotRestoreOnStart = !options.restoreSnapshotOnRestart
config.LocalID = raft.ServerID(options.peerID)
if options.leaseTimeout != 0 {
Expand Down

0 comments on commit c1457a9

Please sign in to comment.