diff --git a/.gitignore b/.gitignore index 32795cfb4e..aee784915a 100644 --- a/.gitignore +++ b/.gitignore @@ -25,7 +25,10 @@ go/cmd/berty-doctor/berty-doctor go/cmd/berty-integration/berty-integration go/cmd/betabot/betabot go/cmd/rdvp/rdvp + +# Various berty outputs betabot.store +benchmark_result.json # Delve's binaries *__debug_bin diff --git a/go/cmd/berty/daemon.go b/go/cmd/berty/daemon.go index 0f8582f10d..0f2e37f4f1 100644 --- a/go/cmd/berty/daemon.go +++ b/go/cmd/berty/daemon.go @@ -19,6 +19,7 @@ func daemonCommand() *ffcli.Command { manager.SetupLocalMessengerServerFlags(fs) // we want to configure a local messenger server manager.SetupDefaultGRPCListenersFlags(fs) manager.SetupMetricsFlags(fs) + manager.SetupInitTimeout(fs) return fs, nil } diff --git a/go/cmd/berty/mini.go b/go/cmd/berty/mini.go index d79ff06375..39168128a4 100644 --- a/go/cmd/berty/mini.go +++ b/go/cmd/berty/mini.go @@ -20,6 +20,7 @@ func miniCommand() *ffcli.Command { manager.SetupLocalMessengerServerFlags(fs) // add flags to allow creating a full node in the same process manager.SetupEmptyGRPCListenersFlags(fs) // by default, we don't want to expose gRPC server for mini manager.SetupRemoteNodeFlags(fs) // mini can be run against an already running server + manager.SetupInitTimeout(fs) return fs, nil } diff --git a/go/internal/initutil/datastore.go b/go/internal/initutil/datastore.go index 0208dbe62a..3e59082d90 100644 --- a/go/internal/initutil/datastore.go +++ b/go/internal/initutil/datastore.go @@ -28,8 +28,8 @@ func (m *Manager) SetupDatastoreFlags(fs *flag.FlagSet) { } func (m *Manager) GetDatastoreDir() (string, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getDatastoreDir() } @@ -65,8 +65,8 @@ func (m *Manager) getDatastoreDir() (string, error) { } func (m *Manager) GetRootDatastore() (datastore.Batching, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getRootDatastore() } diff --git a/go/internal/initutil/ipfs.go b/go/internal/initutil/ipfs.go index 1459734a69..7b64ce6211 100644 --- a/go/internal/initutil/ipfs.go +++ b/go/internal/initutil/ipfs.go @@ -80,8 +80,7 @@ func (m *Manager) SetupLocalIPFSFlags(fs *flag.FlagSet) { } func (m *Manager) GetLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() return m.getLocalIPFS() } @@ -202,7 +201,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode, if m.Node.Protocol.RelayHack { // Resolving addresses - pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.GetContext(), m.initLogger, config.Config.P2P.RelayHack) + pis, err := ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, config.Config.P2P.RelayHack) if err != nil { return nil, nil, errcode.TODO.Wrap(err) } @@ -343,7 +342,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode, if err != nil { return err } - m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.GetContext(), h, + m.Node.Protocol.pubsub, err = pubsub.NewGossipSub(m.getContext(), h, pubsub.WithMessageSigning(true), pubsub.WithFloodPublish(true), pubsub.WithDiscovery(m.Node.Protocol.discovery), @@ -367,7 +366,7 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode, ipfsDS := ipfsutil.NewNamespacedDatastore(rootDS, datastore.NewKey(bertyprotocol.NamespaceIPFSDatastore)) - m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromDatastore(m.GetContext(), ipfsDS, &opts) + m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromDatastore(m.getContext(), ipfsDS, &opts) if err != nil { return nil, nil, errcode.TODO.Wrap(err) } @@ -378,18 +377,18 @@ func (m *Manager) getLocalIPFS() (ipfsutil.ExtendedCoreAPI, *ipfs_core.IpfsNode, return nil, nil, errcode.TODO.Wrap(err) } - m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromRepo(m.GetContext(), repo, &opts) + m.Node.Protocol.ipfsAPI, m.Node.Protocol.ipfsNode, err = ipfsutil.NewCoreAPIFromRepo(m.getContext(), repo, &opts) if err != nil { return nil, nil, errcode.TODO.Wrap(err) } } // PubSub - psapi := ipfsutil.NewPubSubAPI(m.GetContext(), logger.Named("ps"), m.Node.Protocol.discovery, m.Node.Protocol.pubsub) + psapi := ipfsutil.NewPubSubAPI(m.getContext(), logger.Named("ps"), m.Node.Protocol.discovery, m.Node.Protocol.pubsub) m.Node.Protocol.ipfsAPI = ipfsutil.InjectPubSubCoreAPIExtendedAdaptater(m.Node.Protocol.ipfsAPI, psapi) // enable conn logger - ipfsutil.EnableConnLogger(m.GetContext(), logger, m.Node.Protocol.ipfsNode.PeerHost) + ipfsutil.EnableConnLogger(m.getContext(), logger, m.Node.Protocol.ipfsNode.PeerHost) // register metrics if m.Metrics.Listener != "" { @@ -436,7 +435,7 @@ func (m *Manager) getRdvpMaddrs() ([]*peer.AddrInfo, error) { } } - return ipfsutil.ParseAndResolveRdvpMaddrs(m.GetContext(), m.initLogger, addrs) + return ipfsutil.ParseAndResolveRdvpMaddrs(m.getContext(), m.initLogger, addrs) } func (m *Manager) getSwarmAddrs() []string { diff --git a/go/internal/initutil/logger.go b/go/internal/initutil/logger.go index f171e33f8b..c3c312cf1a 100644 --- a/go/internal/initutil/logger.go +++ b/go/internal/initutil/logger.go @@ -34,8 +34,8 @@ func (m *Manager) SetupLoggingFlags(fs *flag.FlagSet) { } func (m *Manager) GetLogger() (*zap.Logger, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getLogger() } diff --git a/go/internal/initutil/manager.go b/go/internal/initutil/manager.go index 2ec0793931..c456c06e59 100644 --- a/go/internal/initutil/manager.go +++ b/go/internal/initutil/manager.go @@ -2,6 +2,7 @@ package initutil import ( "context" + "flag" "fmt" "strings" "sync" @@ -123,6 +124,7 @@ type Manager struct { listeners []grpcutil.Listener } `json:"GRPC,omitempty"` } `json:"Node,omitempty"` + InitTimeout time.Duration `json:"InitTimeout,omitempty"` // internal ctx context.Context @@ -180,6 +182,12 @@ func (m *Manager) applyDefaults() { } func (m *Manager) GetContext() context.Context { + m.mutex.Lock() + defer m.mutex.Unlock() + return m.getContext() +} + +func (m *Manager) getContext() context.Context { if m.ctx == nil { m.ctx, m.ctxCancel = context.WithCancel(context.Background()) } @@ -188,8 +196,8 @@ func (m *Manager) GetContext() context.Context { func (m *Manager) RunWorkers() error { m.workers.Add(func() error { - <-m.GetContext().Done() - return m.GetContext().Err() + <-m.getContext().Done() + return m.getContext().Err() }, func(error) { m.ctxCancel() }) @@ -267,3 +275,31 @@ func (m *Manager) AdvancedHelp() string { tw.Flush() return strings.TrimSpace(b.String()) } + +func (m *Manager) SetupInitTimeout(fs *flag.FlagSet) { + fs.DurationVar(&m.InitTimeout, "node.init-timeout", time.Minute, "maximum time allowed for the initialization") +} + +// prepareForGetter prepare locks and ctx timeouts for an external getter. +// it returns a cleanup and should be defered. +func (m *Manager) prepareForGetter() func() { + m.mutex.Lock() + if m.InitTimeout > 0 { + finishChan := make(chan struct{}) + ticker := time.NewTimer(m.InitTimeout) + go func() { + select { + case <-finishChan: + case <-m.ctx.Done(): + case <-ticker.C: + m.ctxCancel() + } + }() + return func() { + close(finishChan) + ticker.Stop() + m.mutex.Unlock() + } + } + return m.mutex.Unlock +} diff --git a/go/internal/initutil/metrics.go b/go/internal/initutil/metrics.go index 9ce84de6b1..2e25e0f9d9 100644 --- a/go/internal/initutil/metrics.go +++ b/go/internal/initutil/metrics.go @@ -18,8 +18,7 @@ func (m *Manager) SetupMetricsFlags(fs *flag.FlagSet) { } func (m *Manager) GetMetricsRegistry() (*prometheus.Registry, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() return m.getMetricsRegistry() } diff --git a/go/internal/initutil/node.go b/go/internal/initutil/node.go index f5ad0e46f5..950d814f87 100644 --- a/go/internal/initutil/node.go +++ b/go/internal/initutil/node.go @@ -123,10 +123,10 @@ func (m *Manager) applyPreset() error { } func (m *Manager) GetLocalProtocolServer() (bertyprotocol.Service, error) { - m.mutex.Lock() - defer m.mutex.Unlock() - if m.GetContext().Err() != nil { - return nil, m.GetContext().Err() + defer m.prepareForGetter()() + + if m.getContext().Err() != nil { + return nil, m.getContext().Err() } return m.getLocalProtocolServer() } @@ -194,14 +194,14 @@ func (m *Manager) getLocalProtocolServer() (bertyprotocol.Service, error) { OrbitDB: odb, } - m.Node.Protocol.server, err = bertyprotocol.New(m.GetContext(), opts) + m.Node.Protocol.server, err = bertyprotocol.New(m.getContext(), opts) if err != nil { return nil, errcode.TODO.Wrap(err) } // register grpc service bertyprotocol.RegisterProtocolServiceServer(grpcServer, m.Node.Protocol.server) - if err := bertyprotocol.RegisterProtocolServiceHandlerServer(m.GetContext(), gatewayMux, m.Node.Protocol.server); err != nil { + if err := bertyprotocol.RegisterProtocolServiceHandlerServer(m.getContext(), gatewayMux, m.Node.Protocol.server); err != nil { return nil, errcode.TODO.Wrap(err) } } @@ -211,8 +211,8 @@ func (m *Manager) getLocalProtocolServer() (bertyprotocol.Service, error) { } func (m *Manager) GetGRPCClientConn() (*grpc.ClientConn, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getGRPCClientConn() } @@ -263,7 +263,7 @@ func (m *Manager) getGRPCClientConn() (*grpc.ClientConn, error) { grpcServer := grpc.NewServer(serverOpts...) // buffer-based client conn - bl := grpcutil.NewBufListener(m.GetContext(), 256*1024) + bl := grpcutil.NewBufListener(m.getContext(), 256*1024) cc, err := bl.NewClientConn(clientOpts...) if err != nil { return nil, errcode.TODO.Wrap(err) @@ -292,8 +292,8 @@ func (m *Manager) getGRPCClientConn() (*grpc.ClientConn, error) { } func (m *Manager) GetMessengerClient() (bertymessenger.MessengerServiceClient, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getMessengerClient() } @@ -311,8 +311,8 @@ func (m *Manager) SetLifecycleManager(manager *lifecycle.Manager) { } func (m *Manager) GetLifecycleManager() *lifecycle.Manager { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getLifecycleManager() } @@ -341,8 +341,8 @@ func (m *Manager) getMessengerClient() (bertymessenger.MessengerServiceClient, e } func (m *Manager) GetProtocolClient() (bertyprotocol.ProtocolServiceClient, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getProtocolClient() } @@ -362,8 +362,8 @@ func (m *Manager) getProtocolClient() (bertyprotocol.ProtocolServiceClient, erro } func (m *Manager) GetGRPCServer() (*grpc.Server, *grpcgw.ServeMux, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getGRPCServer() } @@ -472,8 +472,8 @@ func (m *Manager) GetGRPCListeners() []grpcutil.Listener { } func (m *Manager) GetMessengerDB() (*gorm.DB, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getMessengerDB() } @@ -556,8 +556,8 @@ func (m *Manager) restoreMessengerDataFromExport() error { } func (m *Manager) GetLocalMessengerServer() (bertymessenger.MessengerServiceServer, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getLocalMessengerServer() } @@ -602,7 +602,7 @@ func (m *Manager) getLocalMessengerServer() (bertymessenger.MessengerServiceServ } // protocol client - protocolClient, err := bertyprotocol.NewClient(m.GetContext(), protocolServer, nil, nil) // FIXME: setup tracing + protocolClient, err := bertyprotocol.NewClient(m.getContext(), protocolServer, nil, nil) // FIXME: setup tracing if err != nil { return nil, errcode.TODO.Wrap(err) } @@ -626,7 +626,7 @@ func (m *Manager) getLocalMessengerServer() (bertymessenger.MessengerServiceServ // register grpc service bertymessenger.RegisterMessengerServiceServer(grpcServer, messengerServer) - if err := bertymessenger.RegisterMessengerServiceHandlerServer(m.GetContext(), gatewayMux, messengerServer); err != nil { + if err := bertymessenger.RegisterMessengerServiceHandlerServer(m.getContext(), gatewayMux, messengerServer); err != nil { return nil, errcode.TODO.Wrap(err) } diff --git a/go/internal/initutil/notification.go b/go/internal/initutil/notification.go index b140c7865b..89079d94f6 100644 --- a/go/internal/initutil/notification.go +++ b/go/internal/initutil/notification.go @@ -27,8 +27,8 @@ func (m *Manager) SetNotificationManager(manager notification.Manager) { } func (m *Manager) GetNotificationManager() (notification.Manager, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() + return m.getNotificationManager() } diff --git a/go/internal/initutil/orbitdb.go b/go/internal/initutil/orbitdb.go index f512959bc0..4a92f5445b 100644 --- a/go/internal/initutil/orbitdb.go +++ b/go/internal/initutil/orbitdb.go @@ -62,7 +62,7 @@ func (m *Manager) getOrbitDB() (*bertyprotocol.BertyOrbitDB, error) { } if node.PubSub != nil { - self, err := ipfs.Key().Self(m.GetContext()) + self, err := ipfs.Key().Self(m.getContext()) if err != nil { return nil, errcode.TODO.Wrap(err) } @@ -70,7 +70,7 @@ func (m *Manager) getOrbitDB() (*bertyprotocol.BertyOrbitDB, error) { opts.PubSub = pubsubraw.NewPubSub(node.PubSub, self.ID(), opts.Logger, nil) } - odb, err := bertyprotocol.NewBertyOrbitDB(m.GetContext(), ipfs, opts) + odb, err := bertyprotocol.NewBertyOrbitDB(m.getContext(), ipfs, opts) if err != nil { return nil, errcode.TODO.Wrap(err) } @@ -81,8 +81,7 @@ func (m *Manager) getOrbitDB() (*bertyprotocol.BertyOrbitDB, error) { } func (m *Manager) GetOrbitDB() (*bertyprotocol.BertyOrbitDB, error) { - m.mutex.Lock() - defer m.mutex.Unlock() + defer m.prepareForGetter()() return m.getOrbitDB() }