Skip to content

Commit

Permalink
fix: timeout while setuping
Browse files Browse the repository at this point in the history
Signed-off-by: Jorropo <jorropo.pgm@gmail.com>
  • Loading branch information
Jorropo committed Nov 24, 2020
1 parent 0a755d1 commit 1797eb2
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 48 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ go/cmd/berty-doctor/berty-doctor
go/cmd/berty-integration/berty-integration
go/cmd/betabot/betabot
go/cmd/rdvp/rdvp

# Various berty outputs
betabot.store
benchmark_result.json

# Delve's binaries
*__debug_bin
1 change: 1 addition & 0 deletions go/cmd/berty/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func daemonCommand() *ffcli.Command {
manager.SetupLocalMessengerServerFlags(fs) // we want to configure a local messenger server
manager.SetupDefaultGRPCListenersFlags(fs)
manager.SetupMetricsFlags(fs)
manager.SetupInitTimeout(fs)
return fs, nil
}

Expand Down
1 change: 1 addition & 0 deletions go/cmd/berty/mini.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func miniCommand() *ffcli.Command {
manager.SetupLocalMessengerServerFlags(fs) // add flags to allow creating a full node in the same process
manager.SetupEmptyGRPCListenersFlags(fs) // by default, we don't want to expose gRPC server for mini
manager.SetupRemoteNodeFlags(fs) // mini can be run against an already running server
manager.SetupInitTimeout(fs)
return fs, nil
}

Expand Down
8 changes: 4 additions & 4 deletions go/internal/initutil/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (m *Manager) SetupDatastoreFlags(fs *flag.FlagSet) {
}

func (m *Manager) GetDatastoreDir() (string, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getDatastoreDir()
}

Expand Down Expand Up @@ -65,8 +65,8 @@ func (m *Manager) getDatastoreDir() (string, error) {
}

func (m *Manager) GetRootDatastore() (datastore.Batching, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getRootDatastore()
}

Expand Down
17 changes: 8 additions & 9 deletions go/internal/initutil/ipfs.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func (m *Manager) SetupLocalIPFSFlags(fs *flag.FlagSet) {
}

func (m *Manager) GetLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getLocalIPFS()
}
Expand Down Expand Up @@ -202,7 +201,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,

if m.Node.Protocol.RelayHack {
// Resolving addresses
pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.GetContext(), m.initLogger, config.Config.P2P.RelayHack)
pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, config.Config.P2P.RelayHack)
if err != nil {
return nil, nil, errcode.TODO.Wrap(err)
}
Expand Down Expand Up @@ -343,7 +342,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,
if err != nil {
return err
}
m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.GetContext(), h,
m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.getContext(), h,
pubsub.WithMessageSigning(true),
pubsub.WithFloodPublish(true),
pubsub.WithDiscovery(m.Node.Protocol.discovery),
Expand All @@ -367,7 +366,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,

ipfsDS := ipfsutil.NewNamespacedDatastore(rootDS, datastore.NewKey(bertyprotocol.NamespaceIPFSDatastore))

m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromDatastore(m.GetContext(), ipfsDS, &opts)
m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromDatastore(m.getContext(), ipfsDS, &opts)
if err != nil {
return nil, nil, errcode.TODO.Wrap(err)
}
Expand All @@ -378,18 +377,18 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode,
return nil, nil, errcode.TODO.Wrap(err)
}

m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromRepo(m.GetContext(), repo, &opts)
m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromRepo(m.getContext(), repo, &opts)
if err != nil {
return nil, nil, errcode.TODO.Wrap(err)
}
}

// PubSub
psapi := ipfsutil.NewPubSubAPI(m.GetContext(), logger.Named("ps"), m.Node.Protocol.discovery, m.Node.Protocol.pubsub)
psapi := ipfsutil.NewPubSubAPI(m.getContext(), logger.Named("ps"), m.Node.Protocol.discovery, m.Node.Protocol.pubsub)
m.Node.Protocol.ipfsAPI = ipfsutil.InjectPubSubCoreAPIExtendedAdaptater(m.Node.Protocol.ipfsAPI, psapi)

// enable conn logger
ipfsutil.EnableConnLogger(m.GetContext(), logger, m.Node.Protocol.ipfsNode.PeerHost)
ipfsutil.EnableConnLogger(m.getContext(), logger, m.Node.Protocol.ipfsNode.PeerHost)

// register metrics
if m.Metrics.Listener != "" {
Expand Down Expand Up @@ -436,7 +435,7 @@ func (m *Manager) getRdvpMaddrs() ([]*peer.AddrInfo, error) {
}
}

return ipfsutil.ParseAndResolveRdvpMaddrs(m.GetContext(), m.initLogger, addrs)
return ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, addrs)
}

func (m *Manager) getSwarmAddrs() []string {
Expand Down
4 changes: 2 additions & 2 deletions go/internal/initutil/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ func (m *Manager) SetupLoggingFlags(fs *flag.FlagSet) {
}

func (m *Manager) GetLogger() (*zap.Logger, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getLogger()
}

Expand Down
40 changes: 38 additions & 2 deletions go/internal/initutil/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package initutil

import (
"context"
"flag"
"fmt"
"strings"
"sync"
Expand Down Expand Up @@ -123,6 +124,7 @@ type Manager struct {
listeners []grpcutil.Listener
} `json:"GRPC,omitempty"`
} `json:"Node,omitempty"`
InitTimeout time.Duration `json:"InitTimeout,omitempty"`

// internal
ctx context.Context
Expand Down Expand Up @@ -180,6 +182,12 @@ func (m *Manager) applyDefaults() {
}

func (m *Manager) GetContext() context.Context {
m.mutex.Lock()
defer m.mutex.Unlock()
return m.getContext()
}

func (m *Manager) getContext() context.Context {
if m.ctx == nil {
m.ctx, m.ctxCancel = context.WithCancel(context.Background())
}
Expand All @@ -188,8 +196,8 @@ func (m *Manager) GetContext() context.Context {

func (m *Manager) RunWorkers() error {
m.workers.Add(func() error {
<-m.GetContext().Done()
return m.GetContext().Err()
<-m.getContext().Done()
return m.getContext().Err()
}, func(error) {
m.ctxCancel()
})
Expand Down Expand Up @@ -267,3 +275,31 @@ func (m *Manager) AdvancedHelp() string {
tw.Flush()
return strings.TrimSpace(b.String())
}

func (m *Manager) SetupInitTimeout(fs *flag.FlagSet) {
fs.DurationVar(&m.InitTimeout, "node.init-timeout", time.Minute, "maximum time allowed for the initialization")
}

// prepareForGetter prepare locks and ctx timeouts for an external getter.
// it returns a cleanup and should be defered.
func (m *Manager) prepareForGetter() func() {
m.mutex.Lock()
if m.InitTimeout > 0 {
finishChan := make(chan struct{})
ticker := time.NewTimer(m.InitTimeout)
go func() {
select {
case <-finishChan:
case <-m.ctx.Done():
case <-ticker.C:
m.ctxCancel()
}
}()
return func() {
close(finishChan)
ticker.Stop()
m.mutex.Unlock()
}
}
return m.mutex.Unlock
}
3 changes: 1 addition & 2 deletions go/internal/initutil/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ func (m *Manager) SetupMetricsFlags(fs *flag.FlagSet) {
}

func (m *Manager) GetMetricsRegistry() (*prometheus.Registry, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getMetricsRegistry()
}
Expand Down
46 changes: 23 additions & 23 deletions go/internal/initutil/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ func (m *Manager) applyPreset() error {
}

func (m *Manager) GetLocalProtocolServer() (bertyprotocol.Service, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
if m.GetContext().Err() != nil {
return nil, m.GetContext().Err()
defer m.prepareForGetter()()

if m.getContext().Err() != nil {
return nil, m.getContext().Err()
}
return m.getLocalProtocolServer()
}
Expand Down Expand Up @@ -194,14 +194,14 @@ func (m *Manager) getLocalProtocolServer() (bertyprotocol.Service, error) {
OrbitDB: odb,
}

m.Node.Protocol.server, err = bertyprotocol.New(m.GetContext(), opts)
m.Node.Protocol.server, err = bertyprotocol.New(m.getContext(), opts)
if err != nil {
return nil, errcode.TODO.Wrap(err)
}

// register grpc service
bertyprotocol.RegisterProtocolServiceServer(grpcServer, m.Node.Protocol.server)
if err := bertyprotocol.RegisterProtocolServiceHandlerServer(m.GetContext(), gatewayMux, m.Node.Protocol.server); err != nil {
if err := bertyprotocol.RegisterProtocolServiceHandlerServer(m.getContext(), gatewayMux, m.Node.Protocol.server); err != nil {
return nil, errcode.TODO.Wrap(err)
}
}
Expand All @@ -211,8 +211,8 @@ func (m *Manager) getLocalProtocolServer() (bertyprotocol.Service, error) {
}

func (m *Manager) GetGRPCClientConn() (*grpc.ClientConn, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getGRPCClientConn()
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (m *Manager) getGRPCClientConn() (*grpc.ClientConn, error) {
grpcServer := grpc.NewServer(serverOpts...)

// buffer-based client conn
bl := grpcutil.NewBufListener(m.GetContext(), 256*1024)
bl := grpcutil.NewBufListener(m.getContext(), 256*1024)
cc, err := bl.NewClientConn(clientOpts...)
if err != nil {
return nil, errcode.TODO.Wrap(err)
Expand Down Expand Up @@ -292,8 +292,8 @@ func (m *Manager) getGRPCClientConn() (*grpc.ClientConn, error) {
}

func (m *Manager) GetMessengerClient() (bertymessenger.MessengerServiceClient, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getMessengerClient()
}

Expand All @@ -311,8 +311,8 @@ func (m *Manager) SetLifecycleManager(manager *lifecycle.Manager) {
}

func (m *Manager) GetLifecycleManager() *lifecycle.Manager {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getLifecycleManager()
}

Expand Down Expand Up @@ -341,8 +341,8 @@ func (m *Manager) getMessengerClient() (bertymessenger.MessengerServiceClient, e
}

func (m *Manager) GetProtocolClient() (bertyprotocol.ProtocolServiceClient, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getProtocolClient()
}

Expand All @@ -362,8 +362,8 @@ func (m *Manager) getProtocolClient() (bertyprotocol.ProtocolServiceClient, erro
}

func (m *Manager) GetGRPCServer() (*grpc.Server, *grpcgw.ServeMux, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getGRPCServer()
}

Expand Down Expand Up @@ -472,8 +472,8 @@ func (m *Manager) GetGRPCListeners() []grpcutil.Listener {
}

func (m *Manager) GetMessengerDB() (*gorm.DB, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getMessengerDB()
}

Expand Down Expand Up @@ -556,8 +556,8 @@ func (m *Manager) restoreMessengerDataFromExport() error {
}

func (m *Manager) GetLocalMessengerServer() (bertymessenger.MessengerServiceServer, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getLocalMessengerServer()
}

Expand Down Expand Up @@ -602,7 +602,7 @@ func (m *Manager) getLocalMessengerServer() (bertymessenger.MessengerServiceServ
}

// protocol client
protocolClient, err := bertyprotocol.NewClient(m.GetContext(), protocolServer, nil, nil) // FIXME: setup tracing
protocolClient, err := bertyprotocol.NewClient(m.getContext(), protocolServer, nil, nil) // FIXME: setup tracing
if err != nil {
return nil, errcode.TODO.Wrap(err)
}
Expand All @@ -626,7 +626,7 @@ func (m *Manager) getLocalMessengerServer() (bertymessenger.MessengerServiceServ

// register grpc service
bertymessenger.RegisterMessengerServiceServer(grpcServer, messengerServer)
if err := bertymessenger.RegisterMessengerServiceHandlerServer(m.GetContext(), gatewayMux, messengerServer); err != nil {
if err := bertymessenger.RegisterMessengerServiceHandlerServer(m.getContext(), gatewayMux, messengerServer); err != nil {
return nil, errcode.TODO.Wrap(err)
}

Expand Down
4 changes: 2 additions & 2 deletions go/internal/initutil/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ func (m *Manager) SetNotificationManager(manager notification.Manager) {
}

func (m *Manager) GetNotificationManager() (notification.Manager, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getNotificationManager()
}

Expand Down
7 changes: 3 additions & 4 deletions go/internal/initutil/orbitdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,15 @@ func (m *Manager) getOrbitDB() (*bertyprotocol.BertyOrbitDB, error) {
}

if node.PubSub != nil {
self, err := ipfs.Key().Self(m.GetContext())
self, err := ipfs.Key().Self(m.getContext())
if err != nil {
return nil, errcode.TODO.Wrap(err)
}

opts.PubSub = pubsubraw.NewPubSub(node.PubSub, self.ID(), opts.Logger, nil)
}

odb, err := bertyprotocol.NewBertyOrbitDB(m.GetContext(), ipfs, opts)
odb, err := bertyprotocol.NewBertyOrbitDB(m.getContext(), ipfs, opts)
if err != nil {
return nil, errcode.TODO.Wrap(err)
}
Expand All @@ -81,8 +81,7 @@ func (m *Manager) getOrbitDB() (*bertyprotocol.BertyOrbitDB, error) {
}

func (m *Manager) GetOrbitDB() (*bertyprotocol.BertyOrbitDB, error) {
m.mutex.Lock()
defer m.mutex.Unlock()
defer m.prepareForGetter()()

return m.getOrbitDB()
}

0 comments on commit 1797eb2

Please sign in to comment.