Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Commit

Permalink
Feat/support oss storage (#147)
Browse files Browse the repository at this point in the history
* Sector Redo: The context passed by the command is released before the thread runs (#142)

Co-authored-by: 一页素书 <2931107265@qq.com>

* Fix bugs (#143)

* Fix the bug of SetConfig exception

* Missing some color schemes for sector status

* Repair the pieces info of the cc data modified in the verification deal (#146)

Co-authored-by: 一页素书 <2931107265@qq.com>

* use oss storage

* fix cli

Co-authored-by: Susanoo <40375298+diwufeiwen@users.noreply.github.com>
Co-authored-by: 一页素书 <2931107265@qq.com>
Co-authored-by: 问心 <85589661@qq.com>
  • Loading branch information
4 people committed Jan 17, 2022
1 parent aac4a27 commit 3650510
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 61 deletions.
9 changes: 9 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func GetFullNodeAPIV2(cctx *cli.Context) (FullNode, jsonrpc.ClientCloser, error)
return NewFullNodeRPC(cctx.Context, addr, apiInfo.AuthHeader())
}

func GetFullNodeFromNodeConfig(ctx context.Context, cfg *config.NodeConfig) (FullNode, jsonrpc.ClientCloser, error) {
apiInfo := apiinfo.NewAPIInfo(cfg.Url, cfg.Token)
addr, err := apiInfo.DialArgs("v1")
if err != nil {
return nil, nil, xerrors.Errorf("could not get DialArgs: %w", err)
}
return NewFullNodeRPC(ctx, addr, apiInfo.AuthHeader())
}

func GetFullNodeAPIFromConfig(cctx *cli.Context) (apiinfo.APIInfo, error) {
repoPath := cctx.String("repo")
cfgPath := config.FsConfig(repoPath)
Expand Down
2 changes: 2 additions & 0 deletions api/impl/strageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package impl
import (
"context"
"encoding/json"
api2 "github.com/filecoin-project/venus-market/api"
types4 "github.com/filecoin-project/venus-market/types"
"net/http"
"strconv"
"time"
Expand Down
1 change: 1 addition & 0 deletions api/storage_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
types4 "github.com/filecoin-project/venus-market/types"
"time"

"github.com/google/uuid"
Expand Down
114 changes: 77 additions & 37 deletions app/venus-sealer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/filecoin-project/venus-market/piecestorage"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -130,20 +131,28 @@ var initCmd = &cli.Command{
Usage: "gateway token",
},

&cli.StringFlag{
Name: "market-url",
Usage: "market url",
},
&cli.StringFlag{
Name: "market-token",
Usage: "market token",
},

&cli.StringFlag{
Name: "auth-token",
Usage: "auth token",
},

&cli.StringFlag{
Name: "piecestorage",
Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}",
},
},
Action: func(cctx *cli.Context) error {
ctx := api.ReqContext(cctx)
log.Info("Initializing venus sealer")

sectorSizeInt, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return err
}
ssize := abi.SectorSize(sectorSizeInt)

gasPrice, err := types.BigFromString(cctx.String("gas-premium"))
if err != nil {
return xerrors.Errorf("failed to parse gas-price flag: %s", err)
Expand All @@ -160,8 +169,11 @@ var initCmd = &cli.Command{
return err
}

setAuthToken(cctx)
parseFlag(defaultCfg, cctx)
setAuthToken(defaultCfg, cctx)
err = parseFlag(defaultCfg, cctx)
if err != nil {
return err
}
if err := checkURL(defaultCfg); err != nil {
return err
}
Expand All @@ -176,14 +188,9 @@ var initCmd = &cli.Command{
return err
}

ctx := api.ReqContext(cctx)
if err := paramfetch.GetParams(ctx, ps, srs, uint64(ssize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

log.Info("Trying to connect to full node RPC")

fullNode, closer, err := api.GetFullNodeAPIV2(cctx) // TODO: consider storing full node address in config
fullNode, closer, err := api.GetFullNodeFromNodeConfig(ctx, &defaultCfg.Node) // TODO: consider storing full node address in config
if err != nil {
return err
}
Expand Down Expand Up @@ -282,7 +289,12 @@ var initCmd = &cli.Command{
}
}

if err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, ssize, gasPrice); err != nil {
ssize, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return fmt.Errorf("failed to parse sector size: %w", err)
}
minerAddr, err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, abi.SectorSize(ssize), gasPrice)
if err != nil {
log.Errorf("Failed to initialize venus-miner: %+v", err)
path, err := homedir.Expand(defaultCfg.DataDir)
if err != nil {
Expand All @@ -295,23 +307,31 @@ var initCmd = &cli.Command{
return xerrors.Errorf("Storage-miner init failed")
}

minerInfo, err := fullNode.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
if err != nil {
return err
}
// TODO: Point to setting storage price, maybe do it interactively or something
log.Info("Sealer successfully created, you can now start it with 'venus-sealer run'")

if err := paramfetch.GetParams(ctx, ps, srs, uint64(minerInfo.SectorSize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
return nil
},
}

func setAuthToken(cctx *cli.Context) {
func setAuthToken(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("auth-token") {
authToken := cctx.String("auth-token")
_ = cctx.Set("node-token", authToken)
_ = cctx.Set("messager-token", authToken)
_ = cctx.Set("gateway-token", authToken)
cfg.Node.Token = authToken
cfg.Messager.Token = authToken
cfg.RegisterProof.Token = authToken
cfg.RegisterMarket.Token = authToken
cfg.RegisterMarket.Token = authToken
}
}

func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) error {
cfg.DataDir = cctx.String("repo")

if cctx.IsSet("messager-url") {
Expand All @@ -326,6 +346,11 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
cfg.RegisterProof.Urls = cctx.StringSlice("gateway-url")
}

if cctx.IsSet("market-url") {
cfg.RegisterMarket.Urls = []string{cctx.String("market-url")}
cfg.Market.Url = cctx.String("market-url")
}

if cctx.IsSet("node-token") {
cfg.Node.Token = cctx.String("node-token")
}
Expand All @@ -337,6 +362,21 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("gateway-token") {
cfg.RegisterProof.Token = cctx.String("gateway-token")
}

if cctx.IsSet("market-token") {
cfg.Market.Token = cctx.String("market-token")
cfg.RegisterMarket.Token = cctx.String("market-token")
}

if cctx.IsSet("piecestorage") {
pieceStorage, err := piecestorage.ParserProtocol(cctx.String("piecestorage"))
if err != nil {
return err
}

cfg.PieceStorage = pieceStorage
}
return nil
}

func parseMultiAddr(url string) error {
Expand Down Expand Up @@ -367,76 +407,76 @@ func checkURL(cfg *config.StorageMiner) error {
return nil
}

func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) error {
func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) (address.Address, error) {
log.Info("Initializing libp2p identity")

repo, err := models.SetDataBase(config.HomeDir(cfg.DataDir), &cfg.DB)
if err != nil {
return err
return address.Undef, err
}
err = repo.AutoMigrate()
if err != nil {
return err
return address.Undef, err
}

metaDataService := service.NewMetadataService(repo)
sectorInfoService := service.NewSectorInfoService(repo)
p2pSk, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return xerrors.Errorf("make host key: %w", err)
return address.Undef, xerrors.Errorf("make host key: %w", err)
}

peerid, err := peer.IDFromPrivateKey(p2pSk)
if err != nil {
return xerrors.Errorf("peer ID from private key: %w", err)
return address.Undef, xerrors.Errorf("peer ID from private key: %w", err)
}

var addr address.Address
if act := cctx.String("actor"); act != "" {
a, err := address.NewFromString(act)
if err != nil {
return xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
return address.Undef, xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
}

if cctx.Bool("genesis-miner") {
if err := metaDataService.SaveMinerAddress(a); err != nil {
return err
return address.Undef, err
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

return nil
return a, nil
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

addr = a
} else {
a, err := createStorageMiner(ctx, api, messagerClient, peerid, gasPrice, cctx)
if err != nil {
return xerrors.Errorf("creating miner failed: %w", err)
return address.Undef, xerrors.Errorf("creating miner failed: %w", err)
}

addr = a
Expand All @@ -445,10 +485,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
log.Infof("Created new miner: %s", addr)

if err := metaDataService.SaveMinerAddress(addr); err != nil {
return err
return address.Undef, err
}

return nil
return addr, nil
}

func createStorageMiner(ctx context.Context, nodeAPI api.FullNode, messagerClient api.IMessager, peerid peer.ID, gasPrice types.BigInt, cctx *cli.Context) (address.Address, error) {
Expand Down
4 changes: 4 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/filecoin-project/go-state-types/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
api2 "github.com/filecoin-project/venus-market/api"
config2 "github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-sealer/api"
"github.com/filecoin-project/venus-sealer/api/impl"
"github.com/filecoin-project/venus-sealer/config"
Expand Down Expand Up @@ -167,6 +169,7 @@ func Repo(cfg *config.StorageMiner) Option {
Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(*storage.AddressSelector), AddressSelector(&cfg.Addresses)),
Override(new(*config.DbConfig), &cfg.DB),
Override(new(*config2.PieceStorage), &cfg.PieceStorage),
Override(new(*config.StorageMiner), cfg),
Override(new(*config.MessagerConfig), &cfg.Messager),
Override(new(*config.MarketConfig), &cfg.Market),
Expand All @@ -176,6 +179,7 @@ func Repo(cfg *config.StorageMiner) Option {

Override(new(api.IMessager), api.NewMessageRPC),
Override(new(api2.MarketFullNode), api.NewMarketRPC),
Override(new(piecestorage.IPieceStorage), NewPieceStorage),
Override(new(*market_client.MarketEventClient), market_client.NewMarketEventClient),
Override(new(*proof_client.ProofEventClient), proof_client.NewProofEventClient),
Override(new(repo.Repo), models.SetDataBase),
Expand Down
28 changes: 15 additions & 13 deletions config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package config

import (
"encoding"
"github.com/filecoin-project/venus-market/config"
"net/http"
"strings"
"time"
Expand All @@ -25,7 +26,6 @@ const (
RetrievalPricingExternalMode = "external"
)


type HomeDir string

type StorageWorker struct {
Expand All @@ -41,19 +41,21 @@ func (cfg StorageWorker) LocalStorage() *LocalStorage {

// StorageMiner is a miner config
type StorageMiner struct {
DataDir string
API API
Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
NetParams NetParamsConfig
DB DbConfig
Node NodeConfig
JWT JWTConfig
Messager MessagerConfig
DataDir string
API API
Dealmaking DealmakingConfig
Sealing SealingConfig
Storage sectorstorage.SealerConfig
Fees MinerFeeConfig
Addresses MinerAddressConfig
NetParams NetParamsConfig
DB DbConfig
Node NodeConfig
JWT JWTConfig
Messager MessagerConfig

Market MarketConfig
PieceStorage config.PieceStorage
RegisterProof RegisterProofConfig
RegisterMarket RegisterMarketConfig

Expand Down
8 changes: 4 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ require (
github.com/filecoin-project/specs-actors/v6 v6.0.1
github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506
github.com/filecoin-project/venus v1.1.3-rc1
github.com/filecoin-project/venus-market v1.0.2-0.20211116071853-d29d45417e0c
github.com/filecoin-project/venus-messager v1.2.0
github.com/filecoin-project/venus-market v1.0.2-0.20211206084715-070874dc17eb
github.com/filecoin-project/venus-messager v1.2.2-rc1.0.20211201075617-c9dd295b905c
github.com/gbrlsnchs/jwt/v3 v3.0.0
github.com/golang/mock v1.6.0
github.com/google/uuid v1.2.0
Expand Down Expand Up @@ -71,10 +71,10 @@ require (
github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8
github.com/zbiljic/go-filelock v0.0.0-20170914061330-1dbf7103ab7d
go.opencensus.io v0.23.0
go.uber.org/fx v1.13.1
go.uber.org/fx v1.15.0
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20210525063256-abc453219eb5
golang.org/x/net v0.0.0-20210614182718-04defd469f4e
golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1
gorm.io/driver/mysql v1.1.1
Expand Down
Loading

0 comments on commit 3650510

Please sign in to comment.