Skip to content
This repository has been archived by the owner on Dec 19, 2022. It is now read-only.

Commit

Permalink
Feat/support oss storage (#147)
Browse files Browse the repository at this point in the history
* Sector Redo: The context passed by the command is released before the thread runs (#142)

Co-authored-by: 一页素书 <2931107265@qq.com>

* Fix bugs (#143)

* Fix the bug of SetConfig exception

* Missing some color schemes for sector status

* Repair the pieces info of the cc data modified in the verification deal (#146)

Co-authored-by: 一页素书 <2931107265@qq.com>

* use oss storage

* fix cli

Co-authored-by: Susanoo <40375298+diwufeiwen@users.noreply.github.com>
Co-authored-by: 一页素书 <2931107265@qq.com>
Co-authored-by: 问心 <85589661@qq.com>
  • Loading branch information
4 people authored Dec 7, 2021
1 parent 33c0bb0 commit 5b32c7e
Show file tree
Hide file tree
Showing 19 changed files with 185 additions and 81 deletions.
9 changes: 9 additions & 0 deletions api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,15 @@ func GetFullNodeAPIV2(cctx *cli.Context) (FullNode, jsonrpc.ClientCloser, error)
return NewFullNodeRPC(cctx.Context, addr, apiInfo.AuthHeader())
}

func GetFullNodeFromNodeConfig(ctx context.Context, cfg *config.NodeConfig) (FullNode, jsonrpc.ClientCloser, error) {
apiInfo := apiinfo.NewAPIInfo(cfg.Url, cfg.Token)
addr, err := apiInfo.DialArgs("v1")
if err != nil {
return nil, nil, xerrors.Errorf("could not get DialArgs: %w", err)
}
return NewFullNodeRPC(ctx, addr, apiInfo.AuthHeader())
}

func GetFullNodeAPIFromConfig(cctx *cli.Context) (apiinfo.APIInfo, error) {
repoPath := cctx.String("repo")
cfgPath := config.FsConfig(repoPath)
Expand Down
2 changes: 2 additions & 0 deletions api/impl/strageminer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package impl
import (
"context"
"encoding/json"
api2 "github.com/filecoin-project/venus-market/api"
types4 "github.com/filecoin-project/venus-market/types"
"net/http"
"strconv"
"time"
Expand Down
1 change: 1 addition & 0 deletions api/storage_struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"context"
types4 "github.com/filecoin-project/venus-market/types"
"time"

"github.com/google/uuid"
Expand Down
114 changes: 77 additions & 37 deletions app/venus-sealer/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/rand"
"encoding/json"
"fmt"
"github.com/filecoin-project/venus-market/piecestorage"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -130,20 +131,28 @@ var initCmd = &cli.Command{
Usage: "gateway token",
},

&cli.StringFlag{
Name: "market-url",
Usage: "market url",
},
&cli.StringFlag{
Name: "market-token",
Usage: "market token",
},

&cli.StringFlag{
Name: "auth-token",
Usage: "auth token",
},

&cli.StringFlag{
Name: "piecestorage",
Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}",
},
},
Action: func(cctx *cli.Context) error {
ctx := api.ReqContext(cctx)
log.Info("Initializing venus sealer")

sectorSizeInt, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return err
}
ssize := abi.SectorSize(sectorSizeInt)

gasPrice, err := types.BigFromString(cctx.String("gas-premium"))
if err != nil {
return xerrors.Errorf("failed to parse gas-price flag: %s", err)
Expand All @@ -160,8 +169,11 @@ var initCmd = &cli.Command{
return err
}

setAuthToken(cctx)
parseFlag(defaultCfg, cctx)
setAuthToken(defaultCfg, cctx)
err = parseFlag(defaultCfg, cctx)
if err != nil {
return err
}
if err := checkURL(defaultCfg); err != nil {
return err
}
Expand All @@ -176,14 +188,9 @@ var initCmd = &cli.Command{
return err
}

ctx := api.ReqContext(cctx)
if err := paramfetch.GetParams(ctx, ps, srs, uint64(ssize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}

log.Info("Trying to connect to full node RPC")

fullNode, closer, err := api.GetFullNodeAPIV2(cctx) // TODO: consider storing full node address in config
fullNode, closer, err := api.GetFullNodeFromNodeConfig(ctx, &defaultCfg.Node) // TODO: consider storing full node address in config
if err != nil {
return err
}
Expand Down Expand Up @@ -282,7 +289,12 @@ var initCmd = &cli.Command{
}
}

if err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, ssize, gasPrice); err != nil {
ssize, err := units.RAMInBytes(cctx.String("sector-size"))
if err != nil {
return fmt.Errorf("failed to parse sector size: %w", err)
}
minerAddr, err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, abi.SectorSize(ssize), gasPrice)
if err != nil {
log.Errorf("Failed to initialize venus-miner: %+v", err)
path, err := homedir.Expand(defaultCfg.DataDir)
if err != nil {
Expand All @@ -295,23 +307,31 @@ var initCmd = &cli.Command{
return xerrors.Errorf("Storage-miner init failed")
}

minerInfo, err := fullNode.StateMinerInfo(ctx, minerAddr, types.EmptyTSK)
if err != nil {
return err
}
// TODO: Point to setting storage price, maybe do it interactively or something
log.Info("Sealer successfully created, you can now start it with 'venus-sealer run'")

if err := paramfetch.GetParams(ctx, ps, srs, uint64(minerInfo.SectorSize)); err != nil {
return xerrors.Errorf("fetching proof parameters: %w", err)
}
return nil
},
}

func setAuthToken(cctx *cli.Context) {
func setAuthToken(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("auth-token") {
authToken := cctx.String("auth-token")
_ = cctx.Set("node-token", authToken)
_ = cctx.Set("messager-token", authToken)
_ = cctx.Set("gateway-token", authToken)
cfg.Node.Token = authToken
cfg.Messager.Token = authToken
cfg.RegisterProof.Token = authToken
cfg.RegisterMarket.Token = authToken
cfg.RegisterMarket.Token = authToken
}
}

func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) error {
cfg.DataDir = cctx.String("repo")

if cctx.IsSet("messager-url") {
Expand All @@ -326,6 +346,11 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
cfg.RegisterProof.Urls = cctx.StringSlice("gateway-url")
}

if cctx.IsSet("market-url") {
cfg.RegisterMarket.Urls = []string{cctx.String("market-url")}
cfg.Market.Url = cctx.String("market-url")
}

if cctx.IsSet("node-token") {
cfg.Node.Token = cctx.String("node-token")
}
Expand All @@ -337,6 +362,21 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) {
if cctx.IsSet("gateway-token") {
cfg.RegisterProof.Token = cctx.String("gateway-token")
}

if cctx.IsSet("market-token") {
cfg.Market.Token = cctx.String("market-token")
cfg.RegisterMarket.Token = cctx.String("market-token")
}

if cctx.IsSet("piecestorage") {
pieceStorage, err := piecestorage.ParserProtocol(cctx.String("piecestorage"))
if err != nil {
return err
}

cfg.PieceStorage = pieceStorage
}
return nil
}

func parseMultiAddr(url string) error {
Expand Down Expand Up @@ -367,76 +407,76 @@ func checkURL(cfg *config.StorageMiner) error {
return nil
}

func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) error {
func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) (address.Address, error) {
log.Info("Initializing libp2p identity")

repo, err := models.SetDataBase(config.HomeDir(cfg.DataDir), &cfg.DB)
if err != nil {
return err
return address.Undef, err
}
err = repo.AutoMigrate()
if err != nil {
return err
return address.Undef, err
}

metaDataService := service.NewMetadataService(repo)
sectorInfoService := service.NewSectorInfoService(repo)
p2pSk, _, err := crypto.GenerateEd25519Key(rand.Reader)
if err != nil {
return xerrors.Errorf("make host key: %w", err)
return address.Undef, xerrors.Errorf("make host key: %w", err)
}

peerid, err := peer.IDFromPrivateKey(p2pSk)
if err != nil {
return xerrors.Errorf("peer ID from private key: %w", err)
return address.Undef, xerrors.Errorf("peer ID from private key: %w", err)
}

var addr address.Address
if act := cctx.String("actor"); act != "" {
a, err := address.NewFromString(act)
if err != nil {
return xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
return address.Undef, xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err)
}

if cctx.Bool("genesis-miner") {
if err := metaDataService.SaveMinerAddress(a); err != nil {
return err
return address.Undef, err
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

return nil
return a, nil
}

if pssb := cctx.String("pre-sealed-metadata"); pssb != "" {
pssb, err := homedir.Expand(pssb)
if err != nil {
return err
return address.Undef, err
}

log.Infof("Importing pre-sealed sector metadata for %s", a)

if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil {
return xerrors.Errorf("migrating presealed sector metadata: %w", err)
return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err)
}
}

addr = a
} else {
a, err := createStorageMiner(ctx, api, messagerClient, peerid, gasPrice, cctx)
if err != nil {
return xerrors.Errorf("creating miner failed: %w", err)
return address.Undef, xerrors.Errorf("creating miner failed: %w", err)
}

addr = a
Expand All @@ -445,10 +485,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode,
log.Infof("Created new miner: %s", addr)

if err := metaDataService.SaveMinerAddress(addr); err != nil {
return err
return address.Undef, err
}

return nil
return addr, nil
}

func createStorageMiner(ctx context.Context, nodeAPI api.FullNode, messagerClient api.IMessager, peerid peer.ID, gasPrice types.BigInt, cctx *cli.Context) (address.Address, error) {
Expand Down
8 changes: 8 additions & 0 deletions app/venus-sealer/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ var stateList = []stateMeta{

{col: color.FgBlue, state: types.Empty},
{col: color.FgBlue, state: types.WaitDeals},
{col: color.FgBlue, state: types.AddPiece},

{col: color.FgRed, state: types.UndefinedSectorState},
{col: color.FgYellow, state: types.Packing},
Expand All @@ -96,10 +97,15 @@ var stateList = []stateMeta{
{col: color.FgYellow, state: types.PreCommit2},
{col: color.FgYellow, state: types.PreCommitting},
{col: color.FgYellow, state: types.PreCommitWait},
{col: color.FgYellow, state: types.SubmitPreCommitBatch},
{col: color.FgYellow, state: types.PreCommitBatchWait},
{col: color.FgYellow, state: types.WaitSeed},
{col: color.FgYellow, state: types.Committing},
{col: color.FgYellow, state: types.CommitFinalize},
{col: color.FgYellow, state: types.SubmitCommit},
{col: color.FgYellow, state: types.CommitWait},
{col: color.FgYellow, state: types.SubmitCommitAggregate},
{col: color.FgYellow, state: types.CommitAggregateWait},
{col: color.FgYellow, state: types.FinalizeSector},

{col: color.FgCyan, state: types.Terminating},
Expand All @@ -110,11 +116,13 @@ var stateList = []stateMeta{
{col: color.FgCyan, state: types.Removed},

{col: color.FgRed, state: types.FailedUnrecoverable},
{col: color.FgRed, state: types.AddPieceFailed},
{col: color.FgRed, state: types.SealPreCommit1Failed},
{col: color.FgRed, state: types.SealPreCommit2Failed},
{col: color.FgRed, state: types.PreCommitFailed},
{col: color.FgRed, state: types.ComputeProofFailed},
{col: color.FgRed, state: types.CommitFailed},
{col: color.FgRed, state: types.CommitFinalizeFailed},
{col: color.FgRed, state: types.PackingFailed},
{col: color.FgRed, state: types.FinalizeFailed},
{col: color.FgRed, state: types.Faulty},
Expand Down
4 changes: 4 additions & 0 deletions builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"github.com/filecoin-project/go-state-types/abi"
storage2 "github.com/filecoin-project/specs-storage/storage"
api2 "github.com/filecoin-project/venus-market/api"
config2 "github.com/filecoin-project/venus-market/config"
"github.com/filecoin-project/venus-market/piecestorage"
"github.com/filecoin-project/venus-sealer/api"
"github.com/filecoin-project/venus-sealer/api/impl"
"github.com/filecoin-project/venus-sealer/config"
Expand Down Expand Up @@ -167,6 +169,7 @@ func Repo(cfg *config.StorageMiner) Option {
Override(new(sectorstorage.SealerConfig), cfg.Storage),
Override(new(*storage.AddressSelector), AddressSelector(&cfg.Addresses)),
Override(new(*config.DbConfig), &cfg.DB),
Override(new(*config2.PieceStorage), &cfg.PieceStorage),
Override(new(*config.StorageMiner), cfg),
Override(new(*config.MessagerConfig), &cfg.Messager),
Override(new(*config.MarketConfig), &cfg.Market),
Expand All @@ -176,6 +179,7 @@ func Repo(cfg *config.StorageMiner) Option {

Override(new(api.IMessager), api.NewMessageRPC),
Override(new(api2.MarketFullNode), api.NewMarketRPC),
Override(new(piecestorage.IPieceStorage), NewPieceStorage),
Override(new(*market_client.MarketEventClient), market_client.NewMarketEventClient),
Override(new(*proof_client.ProofEventClient), proof_client.NewProofEventClient),
Override(new(repo.Repo), models.SetDataBase),
Expand Down
Loading

0 comments on commit 5b32c7e

Please sign in to comment.