From a4e458dff5f2442bf699c81e2709355ac5cbfd73 Mon Sep 17 00:00:00 2001 From: cmcater Date: Tue, 15 Aug 2023 12:36:29 -0400 Subject: [PATCH] remove producer node --- cmd/producernode.go | 236 ---------------------------- cmd/{fullnode.go => rumlitenode.go} | 58 +++---- internal/pkg/cli/flags.go | 21 +-- pkg/chainapi/api/server.go | 68 +------- 4 files changed, 31 insertions(+), 352 deletions(-) delete mode 100644 cmd/producernode.go rename cmd/{fullnode.go => rumlitenode.go} (75%) diff --git a/cmd/producernode.go b/cmd/producernode.go deleted file mode 100644 index 7a501fb0..00000000 --- a/cmd/producernode.go +++ /dev/null @@ -1,236 +0,0 @@ -package cmd - -import ( - "context" - "fmt" - "os" - "os/signal" - "syscall" - "time" - - "github.com/fatih/color" - discovery "github.com/libp2p/go-libp2p/p2p/discovery/util" - connmgr "github.com/libp2p/go-libp2p/p2p/net/connmgr" - "github.com/rumsystem/quorum/internal/pkg/appdata" - chain "github.com/rumsystem/quorum/internal/pkg/chainsdk/core" - "github.com/rumsystem/quorum/internal/pkg/cli" - "github.com/rumsystem/quorum/internal/pkg/conn" - "github.com/rumsystem/quorum/internal/pkg/conn/p2p" - "github.com/rumsystem/quorum/internal/pkg/nodectx" - "github.com/rumsystem/quorum/internal/pkg/options" - "github.com/rumsystem/quorum/internal/pkg/storage" - chainstorage "github.com/rumsystem/quorum/internal/pkg/storage/chain" - "github.com/rumsystem/quorum/internal/pkg/utils" - "github.com/rumsystem/quorum/pkg/chainapi/api" - "github.com/rumsystem/quorum/pkg/chainapi/appapi" - localcrypto "github.com/rumsystem/quorum/pkg/crypto" - "github.com/spf13/cobra" -) - -var ( - producerNodeFlag = cli.ProducerNodeFlag{ProtocolID: "/quorum/1.0.0"} - producerNode *p2p.Node - producerSignalCh chan os.Signal -) - -var producerNodeCmd = &cobra.Command{ - Use: "producernode", - Short: "Run producernode", - Run: func(cmd *cobra.Command, args []string) { - if producerNodeFlag.KeyStorePwd == "" { - producerNodeFlag.KeyStorePwd = os.Getenv("RUM_KSPASSWD") - } - runProducerNode(producerNodeFlag) - }, -} - -func init() { - rootCmd.AddCommand(producerNodeCmd) - flags := producerNodeCmd.Flags() - flags.SortFlags = false - - flags.StringVar(&producerNodeFlag.PeerName, "peername", "peer", "peername") - flags.StringVar(&producerNodeFlag.ConfigDir, "configdir", "./config/", "config and keys dir") - flags.StringVar(&producerNodeFlag.DataDir, "datadir", "./data/", "config dir") - flags.StringVar(&producerNodeFlag.KeyStoreDir, "keystoredir", "./keystore/", "keystore dir") - flags.StringVar(&producerNodeFlag.KeyStoreName, "keystorename", "default", "keystore name") - flags.StringVar(&producerNodeFlag.KeyStorePwd, "keystorepass", "", "keystore password") - flags.Var(&producerNodeFlag.ListenAddresses, "listen", "Adds a multiaddress to the listen list, e.g.: --listen /ip4/127.0.0.1/tcp/4215 --listen /ip/127.0.0.1/tcp/5215/ws") - flags.StringVar(&producerNodeFlag.APIHost, "apihost", "", "Domain or public ip addresses for api server") - flags.UintVar(&producerNodeFlag.APIPort, "apiport", 5215, "api server listen port") - flags.StringVar(&producerNodeFlag.CertDir, "certdir", "certs", "ssl certificate directory") - flags.StringVar(&producerNodeFlag.ZeroAccessKey, "zerosslaccesskey", "", "zerossl access key, get from: https://app.zerossl.com/developer") - flags.Var(&producerNodeFlag.BootstrapPeers, "peer", "bootstrap peer address") - flags.StringVar(&producerNodeFlag.JsonTracer, "jsontracer", "", "output tracer data to a json file") - flags.BoolVar(&producerNodeFlag.IsDebug, "debug", false, "show debug log") -} - -func runProducerNode(config cli.ProducerNodeFlag) { - color.Green("Version:%s", utils.GitCommit) - const defaultKeyName = "default" - - producerSignalCh = make(chan os.Signal, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - peername := config.PeerName - - utils.EnsureDir(config.DataDir) - - nodeoptions, err := options.InitNodeOptions(config.ConfigDir, peername) - if err != nil { - cancel() - logger.Fatalf(err.Error()) - } - - nodeoptions.EnableRelay = false - - keystoreParam := InitKeystoreParam{ - KeystoreName: config.KeyStoreName, - KeystoreDir: config.KeyStoreDir, - KeystorePwd: config.KeyStorePwd, - ConfigDir: config.ConfigDir, - PeerName: config.PeerName, - DefaultKeyName: defaultKeyName, - } - - ks, defaultkey, err := InitDefaultKeystore(keystoreParam, nodeoptions) - if err != nil { - cancel() - logger.Fatalf(err.Error()) - } - - keys, err := localcrypto.SignKeytoPeerKeys(defaultkey) - if err != nil { - logger.Fatalf(err.Error()) - cancel() - } - - peerid, ethaddr, err := ks.GetPeerInfo(defaultKeyName) - if err != nil { - cancel() - logger.Fatalf(err.Error()) - } - - logger.Infof("eth addresss: <%s>", ethaddr) - CheckLockError(err) - if err != nil { - cancel() - logger.Fatalf(err.Error()) - } - - nodename := "producernode_default" - - datapath := config.DataDir + "/" + config.PeerName - dbManager, err := storage.CreateDb(datapath) - - if err != nil { - logger.Fatalf(err.Error()) - } - - newchainstorage := chainstorage.NewChainStorage(dbManager) - - //normal node connections: low watermarks: 10 hi watermarks 200, grace 60s - cm, err := connmgr.NewConnManager(10, nodeoptions.ConnsHi, connmgr.WithGracePeriod(60*time.Second)) - if err != nil { - logger.Fatalf(err.Error()) - } - producerNode, err = p2p.NewNode(ctx, nodename, nodeoptions, false, defaultkey, cm, config.ListenAddresses, []string{}, config.JsonTracer) - if err == nil { - producerNode.SetRumExchange(ctx) - } - - nodectx.InitCtx(ctx, nodename, producerNode, dbManager, newchainstorage, "pubsub", utils.GitCommit, nodectx.PRODUCER_NODE) - nodectx.GetNodeCtx().Keystore = ks - nodectx.GetNodeCtx().PublicKey = keys.PubKey - nodectx.GetNodeCtx().PeerId = peerid - - //initial conn - conn.InitConn() - - //initial group manager - chain.InitGroupMgr() - - //load all groups - err = chain.GetGroupMgr().LoadAllGroups() - if err != nil { - logger.Fatalf(err.Error()) - } - - appdb, err := appdata.CreateAppDb(datapath) - if err != nil { - logger.Fatalf(err.Error()) - } - - CheckLockError(err) - - if err := producerNode.Bootstrap(ctx, config.BootstrapPeers); err != nil { - logger.Fatal(err) - } - - for _, addr := range producerNode.Host.Addrs() { - p2paddr := fmt.Sprintf("%s/p2p/%s", addr.String(), producerNode.Host.ID()) - logger.Infof("Peer ID:<%s>, Peer Address:<%s>", producerNode.Host.ID(), p2paddr) - } - - //Discovery and Advertise had been replaced by PeerExchange - logger.Infof("Announcing ourselves...") - discovery.Advertise(ctx, producerNode.RoutingDiscovery, config.RendezvousString) - logger.Infof("Successfully announced!") - - peerok := make(chan struct{}) - go producerNode.ConnectPeers(ctx, peerok, nodeoptions.MaxPeers, config.RendezvousString) - - //start sync all groups - err = chain.GetGroupMgr().StartSyncAllGroups() - if err != nil { - logger.Fatalf(err.Error()) - } - - //run local http api service - apiaddress := fmt.Sprintf("http://localhost:%d/api/v1", config.APIPort) - apph := &appapi.Handler{ - Appdb: appdb, - Trxdb: newchainstorage, - GitCommit: utils.GitCommit, - Apiroot: apiaddress, - ConfigDir: config.ConfigDir, - PeerName: config.PeerName, - NodeName: nodectx.GetNodeCtx().Name, - } - - h := &api.Handler{ - Node: producerNode, - NodeCtx: nodectx.GetNodeCtx(), - Ctx: ctx, - GitCommit: utils.GitCommit, - Appdb: appdb, - ChainAPIdb: newchainstorage, - } - - startParam := api.StartServerParam{ - IsDebug: config.IsDebug, - APIHost: config.APIHost, - APIPort: config.APIPort, - CertDir: config.CertDir, - ZeroAccessKey: config.ZeroAccessKey, - } - - go api.StartProducerServer(startParam, producerSignalCh, h, apph, producerNode, nodeoptions, ks, ethaddr) - - //attach signal - signal.Notify(producerSignalCh, os.Interrupt, os.Kill, syscall.SIGTERM) - signalType := <-producerSignalCh - signal.Stop(producerSignalCh) - - //Stop sync all groups - chain.GetGroupMgr().StopSyncAllGroups() - //teardown all groups - chain.GetGroupMgr().TeardownAllGroups() - //close ctx db - nodectx.GetDbMgr().CloseDb() - - //cleanup before exit - logger.Infof("On Signal <%s>", signalType) - logger.Infof("Exit command received. Exiting...") -} diff --git a/cmd/fullnode.go b/cmd/rumlitenode.go similarity index 75% rename from cmd/fullnode.go rename to cmd/rumlitenode.go index e3f5c866..aa0f1e5c 100644 --- a/cmd/fullnode.go +++ b/cmd/rumlitenode.go @@ -31,48 +31,48 @@ import ( ) var ( - fnodeFlag = cli.FullNodeFlag{ProtocolID: "/quorum/1.0.0"} + rlnodeFlag = cli.RumLiteNodeFlag{ProtocolID: "/quorum/1.0.0"} fullNode *p2p.Node fullNodeSignalch chan os.Signal ) -var fullnodeCmd = &cobra.Command{ - Use: "fullnode", - Short: "Run fullnode", +var rumlitenodeCmd = &cobra.Command{ + Use: "rumlitenode", + Short: "Run rumlite node", Run: func(cmd *cobra.Command, args []string) { - if fnodeFlag.KeyStorePwd == "" { - fnodeFlag.KeyStorePwd = os.Getenv("RUM_KSPASSWD") + if rlnodeFlag.KeyStorePwd == "" { + rlnodeFlag.KeyStorePwd = os.Getenv("RUM_KSPASSWD") } - fnodeFlag.IsDebug = isDebug - runFullnode(fnodeFlag) + rlnodeFlag.IsDebug = isDebug + runRumLiteNode(rlnodeFlag) }, } func init() { - rootCmd.AddCommand(fullnodeCmd) + rootCmd.AddCommand(rumlitenodeCmd) - flags := fullnodeCmd.Flags() + flags := rumlitenodeCmd.Flags() flags.SortFlags = false - flags.StringVar(&fnodeFlag.PeerName, "peername", "peer", "peername") - flags.StringVar(&fnodeFlag.ConfigDir, "configdir", "./config/", "config and keys dir") - flags.StringVar(&fnodeFlag.DataDir, "datadir", "./data/", "data dir") - flags.StringVar(&fnodeFlag.KeyStoreDir, "keystoredir", "./keystore/", "keystore dir") - flags.StringVar(&fnodeFlag.KeyStoreName, "keystorename", "default", "keystore name") - flags.StringVar(&fnodeFlag.KeyStorePwd, "keystorepass", "", "keystore password") - flags.Var(&fnodeFlag.ListenAddresses, "listen", "Adds a multiaddress to the listen list, e.g.: --listen /ip4/127.0.0.1/tcp/4215 --listen /ip/127.0.0.1/tcp/5215/ws") - flags.StringVar(&fnodeFlag.APIHost, "apihost", "", "Domain or public ip addresses for api server") - flags.UintVar(&fnodeFlag.APIPort, "apiport", 5215, "api server listen port") - flags.StringVar(&fnodeFlag.CertDir, "certdir", "certs", "ssl certificate directory") - flags.StringVar(&fnodeFlag.ZeroAccessKey, "zerosslaccesskey", "", "zerossl access key, get from: https://app.zerossl.com/developer") - flags.Var(&fnodeFlag.BootstrapPeers, "peer", "bootstrap peer address") - flags.StringVar(&fnodeFlag.SkipPeers, "skippeers", "", "peer id lists, will be skipped in the pubsub connection") - flags.StringVar(&fnodeFlag.JsonTracer, "jsontracer", "", "output tracer data to a json file") - flags.BoolVar(&fnodeFlag.AutoAck, "autoack", true, "auto ack the transactions in pubqueue") - flags.BoolVar(&fnodeFlag.EnableRelay, "autorelay", true, "enable relay") + flags.StringVar(&rlnodeFlag.PeerName, "peername", "peer", "peername") + flags.StringVar(&rlnodeFlag.ConfigDir, "configdir", "./config/", "config and keys dir") + flags.StringVar(&rlnodeFlag.DataDir, "datadir", "./data/", "data dir") + flags.StringVar(&rlnodeFlag.KeyStoreDir, "keystoredir", "./keystore/", "keystore dir") + flags.StringVar(&rlnodeFlag.KeyStoreName, "keystorename", "default", "keystore name") + flags.StringVar(&rlnodeFlag.KeyStorePwd, "keystorepass", "", "keystore password") + flags.Var(&rlnodeFlag.ListenAddresses, "listen", "Adds a multiaddress to the listen list, e.g.: --listen /ip4/127.0.0.1/tcp/4215 --listen /ip/127.0.0.1/tcp/5215/ws") + flags.StringVar(&rlnodeFlag.APIHost, "apihost", "", "Domain or public ip addresses for api server") + flags.UintVar(&rlnodeFlag.APIPort, "apiport", 5215, "api server listen port") + flags.StringVar(&rlnodeFlag.CertDir, "certdir", "certs", "ssl certificate directory") + flags.StringVar(&rlnodeFlag.ZeroAccessKey, "zerosslaccesskey", "", "zerossl access key, get from: https://app.zerossl.com/developer") + flags.Var(&rlnodeFlag.BootstrapPeers, "peer", "bootstrap peer address") + flags.StringVar(&rlnodeFlag.SkipPeers, "skippeers", "", "peer id lists, will be skipped in the pubsub connection") + flags.StringVar(&rlnodeFlag.JsonTracer, "jsontracer", "", "output tracer data to a json file") + flags.BoolVar(&rlnodeFlag.AutoAck, "autoack", true, "auto ack the transactions in pubqueue") + flags.BoolVar(&rlnodeFlag.EnableRelay, "autorelay", true, "enable relay") } -func runFullnode(config cli.FullNodeFlag) { +func runRumLiteNode(config cli.RumLiteNodeFlag) { // NOTE: hardcode const defaultKeyName = "default" @@ -132,7 +132,7 @@ func runFullnode(config cli.FullNodeFlag) { logger.Fatalf(err.Error()) } - nodename := "fullnode_default" + nodename := "rumlitenode_default" datapath := config.DataDir + "/" + config.PeerName dbManager, err := storage.CreateDb(datapath) @@ -236,7 +236,7 @@ func runFullnode(config cli.FullNodeFlag) { CertDir: config.CertDir, ZeroAccessKey: config.ZeroAccessKey, } - go api.StartFullNodeServer(startParam, fullNodeSignalch, h, apph, fullNode, nodeoptions, ks, ethaddr) + go api.StartRumLiteNodeServer(startParam, fullNodeSignalch, h, apph, fullNode, nodeoptions, ks, ethaddr) //attach signal signal.Notify(fullNodeSignalch, os.Interrupt, syscall.SIGTERM) diff --git a/internal/pkg/cli/flags.go b/internal/pkg/cli/flags.go index 01a2c584..cc13a595 100644 --- a/internal/pkg/cli/flags.go +++ b/internal/pkg/cli/flags.go @@ -8,7 +8,7 @@ import ( type AddrList []maddr.Multiaddr -type FullNodeFlag struct { +type RumLiteNodeFlag struct { RendezvousString string BootstrapPeers AddrList ListenAddresses AddrList @@ -79,25 +79,6 @@ type RelayNodeFlag struct { IsDebug bool } -type ProducerNodeFlag struct { - RendezvousString string - BootstrapPeers AddrList - ListenAddresses AddrList - APIHost string - APIPort uint - CertDir string - ZeroAccessKey string - ProtocolID string - PeerName string - JsonTracer string - IsDebug bool - ConfigDir string - DataDir string - KeyStoreDir string - KeyStoreName string - KeyStorePwd string -} - func (al *AddrList) String() string { strs := make([]string, len(*al)) for i, addr := range *al { diff --git a/pkg/chainapi/api/server.go b/pkg/chainapi/api/server.go index 3fbcfc0a..0f302852 100644 --- a/pkg/chainapi/api/server.go +++ b/pkg/chainapi/api/server.go @@ -64,74 +64,8 @@ func StartBootstrapNodeServer(config StartServerParam, signalch chan os.Signal, } } -func StartProducerServer(config StartServerParam, signalch chan os.Signal, h *Handler, apph *appapi.Handler, node *p2p.Node, nodeopt *options.NodeOptions, ks localcrypto.Keystore, ethaddr string) { - quitch = signalch - e := utils.NewEcho(config.IsDebug) - customJWTConfig := appapi.CustomJWTConfig(nodeopt.JWT.Key) - e.Use(middleware.JWTWithConfig(customJWTConfig)) - e.Use(rummiddleware.OpaWithConfig(rummiddleware.OpaConfig{ - Skipper: rummiddleware.LocalhostSkipper, - Policy: policyStr, - Query: "x = data.quorum.restapi.authz.allow", // FIXME: hardcode - InputFunc: opaInputFunc, - })) - r := e.Group("/api") - a := e.Group("/app/api") - r.GET("/quit", quitapp) - - //r.POST("/v1/group", h.CreateGroupUrl()) - //r.POST("/v1/group/join", h.JoinGroup()) - r.POST("/v2/group/join", h.JoinGroupV2()) - r.POST("/v1/group/leave", h.LeaveGroup) - r.POST("/v1/group/clear", h.ClearGroupData) - r.POST("/v1/group/announce", h.Announce) - - r.GET("/v1/node", h.GetNodeInfo) - r.GET("/v1/network", h.GetNetwork(&node.Host, node.Info, nodeopt, ethaddr)) - //r.GET("/v1/network/stats", h.GetNetworkStatsSummary) - r.GET("/v1/block/:group_id/:block_id", h.GetBlock) - r.GET("/v1/trx/:group_id/:trx_id", h.GetTrx) - - r.GET("/v1/groups", h.GetGroups) - r.GET("/v1/group/:group_id", h.GetGroupById) - r.GET("/v1/group/:group_id/trx/allowlist", h.GetChainTrxAllowList) - r.GET("/v1/group/:group_id/trx/denylist", h.GetChainTrxDenyList) - r.GET("/v1/group/:group_id/trx/auth/:trx_type", h.GetChainTrxAuthMode) - //TBD - //r.GET("/v1/group/:group_id/conensus", h.GetGroupConsensus) - r.GET("/v1/group/:group_id/announced/users", h.GetAnnouncedUsers) - r.GET("/v1/group/:group_id/announced/user/:sign_pubkey", h.GetAnnouncedUser) - r.GET("/v1/group/:group_id/announced/producers", h.GetAnnouncedProducers) - r.GET("/v1/group/:group_id/seed", h.GetGroupSeedHandler) - - //app api - a.POST("/v1/token", apph.CreateToken) - a.DELETE("/v1/token", apph.RemoveToken) - a.POST("/v1/token/refresh", apph.RefreshToken) - a.POST("/v1/token/revoke", apph.RevokeToken) - a.GET("/v1/token/list", apph.ListToken) - - // start https or http server - host := config.APIHost - if utils.IsDomainName(host) { // domain - e.AutoTLSManager.Cache = autocert.DirCache(config.CertDir) - e.AutoTLSManager.HostPolicy = autocert.HostWhitelist(config.APIHost) - e.AutoTLSManager.Prompt = autocert.AcceptTOS - e.Logger.Fatal(e.StartAutoTLS(fmt.Sprintf(":%d", config.APIPort))) - } else if utils.IsPublicIP(host) { // public ip - ip := net.ParseIP(host) - privKeyPath, certPath, err := zerossl.IssueIPCert(config.CertDir, ip, config.ZeroAccessKey) - if err != nil { - e.Logger.Fatal(err) - } - e.Logger.Fatal(e.StartTLS(fmt.Sprintf(":%d", config.APIPort), certPath, privKeyPath)) - } else { // start http server - e.Logger.Fatal(e.Start(fmt.Sprintf("%s:%d", host, config.APIPort))) - } -} - // StartAPIServer : Start local web server -func StartFullNodeServer(config StartServerParam, signalch chan os.Signal, h *Handler, apph *appapi.Handler, node *p2p.Node, nodeopt *options.NodeOptions, ks localcrypto.Keystore, ethaddr string) { +func StartRumLiteNodeServer(config StartServerParam, signalch chan os.Signal, h *Handler, apph *appapi.Handler, node *p2p.Node, nodeopt *options.NodeOptions, ks localcrypto.Keystore, ethaddr string) { quitch = signalch e := utils.NewEcho(config.IsDebug) customJWTConfig := appapi.CustomJWTConfig(nodeopt.JWT.Key)