From 5b32c7eb51bf2d1e5ef2ef4c20c6e618b9096bb2 Mon Sep 17 00:00:00 2001 From: Mike <41407352+hunjixin@users.noreply.github.com> Date: Tue, 7 Dec 2021 11:15:24 +0800 Subject: [PATCH] Feat/support oss storage (#147) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Sector Redo: The context passed by the command is released before the thread runs (#142) Co-authored-by: 一页素书 <2931107265@qq.com> * Fix bugs (#143) * Fix the bug of SetConfig exception * Missing some color schemes for sector status * Repair the pieces info of the cc data modified in the verification deal (#146) Co-authored-by: 一页素书 <2931107265@qq.com> * use oss storage * fix cli Co-authored-by: Susanoo <40375298+diwufeiwen@users.noreply.github.com> Co-authored-by: 一页素书 <2931107265@qq.com> Co-authored-by: 问心 <85589661@qq.com> --- api/client.go | 9 ++ api/impl/strageminer.go | 2 + api/storage_struct.go | 1 + app/venus-sealer/init.go | 114 ++++++++++++++++-------- app/venus-sealer/util.go | 8 ++ builder.go | 4 + config/def.go | 28 +++--- config/local_storage.go | 4 +- go.mod | 12 +-- market_client/market_event.go | 5 +- market_client/modules.go | 4 +- modules.go | 10 ++- sector-storage/ffiwrapper/sealer_cgo.go | 1 + storage-sealing/deals.go | 1 + storage-sealing/garbage.go | 2 +- storage-sealing/sealing.go | 9 +- storage-sealing/states_sealing.go | 44 +++++---- storage/adapter_storage_miner.go | 2 + storage/miner.go | 6 +- 19 files changed, 185 insertions(+), 81 deletions(-) diff --git a/api/client.go b/api/client.go index b7d9b49a..ca0ea5bb 100644 --- a/api/client.go +++ b/api/client.go @@ -44,6 +44,15 @@ func GetFullNodeAPIV2(cctx *cli.Context) (FullNode, jsonrpc.ClientCloser, error) return NewFullNodeRPC(cctx.Context, addr, apiInfo.AuthHeader()) } +func GetFullNodeFromNodeConfig(ctx context.Context, cfg *config.NodeConfig) (FullNode, jsonrpc.ClientCloser, error) { + apiInfo := apiinfo.NewAPIInfo(cfg.Url, cfg.Token) + addr, err := apiInfo.DialArgs("v1") + if err != nil { + return nil, nil, xerrors.Errorf("could not get DialArgs: %w", err) + } + return NewFullNodeRPC(ctx, addr, apiInfo.AuthHeader()) +} + func GetFullNodeAPIFromConfig(cctx *cli.Context) (apiinfo.APIInfo, error) { repoPath := cctx.String("repo") cfgPath := config.FsConfig(repoPath) diff --git a/api/impl/strageminer.go b/api/impl/strageminer.go index 14c6e271..5c19d59e 100644 --- a/api/impl/strageminer.go +++ b/api/impl/strageminer.go @@ -3,6 +3,8 @@ package impl import ( "context" "encoding/json" + api2 "github.com/filecoin-project/venus-market/api" + types4 "github.com/filecoin-project/venus-market/types" "net/http" "strconv" "time" diff --git a/api/storage_struct.go b/api/storage_struct.go index 11c7829c..3c1f67af 100644 --- a/api/storage_struct.go +++ b/api/storage_struct.go @@ -2,6 +2,7 @@ package api import ( "context" + types4 "github.com/filecoin-project/venus-market/types" "time" "github.com/google/uuid" diff --git a/app/venus-sealer/init.go b/app/venus-sealer/init.go index 30fa5dcc..0a3acd64 100644 --- a/app/venus-sealer/init.go +++ b/app/venus-sealer/init.go @@ -6,6 +6,7 @@ import ( "crypto/rand" "encoding/json" "fmt" + "github.com/filecoin-project/venus-market/piecestorage" "io/ioutil" "os" "path/filepath" @@ -130,20 +131,28 @@ var initCmd = &cli.Command{ Usage: "gateway token", }, + &cli.StringFlag{ + Name: "market-url", + Usage: "market url", + }, + &cli.StringFlag{ + Name: "market-token", + Usage: "market token", + }, + &cli.StringFlag{ Name: "auth-token", Usage: "auth token", }, + + &cli.StringFlag{ + Name: "piecestorage", + Usage: "config storage for piece (eg fs:/mnt/piece s3:{access key}:{secret key}:{option token}@{region}host/{bucket}", + }, }, Action: func(cctx *cli.Context) error { + ctx := api.ReqContext(cctx) log.Info("Initializing venus sealer") - - sectorSizeInt, err := units.RAMInBytes(cctx.String("sector-size")) - if err != nil { - return err - } - ssize := abi.SectorSize(sectorSizeInt) - gasPrice, err := types.BigFromString(cctx.String("gas-premium")) if err != nil { return xerrors.Errorf("failed to parse gas-price flag: %s", err) @@ -160,8 +169,11 @@ var initCmd = &cli.Command{ return err } - setAuthToken(cctx) - parseFlag(defaultCfg, cctx) + setAuthToken(defaultCfg, cctx) + err = parseFlag(defaultCfg, cctx) + if err != nil { + return err + } if err := checkURL(defaultCfg); err != nil { return err } @@ -176,14 +188,9 @@ var initCmd = &cli.Command{ return err } - ctx := api.ReqContext(cctx) - if err := paramfetch.GetParams(ctx, ps, srs, uint64(ssize)); err != nil { - return xerrors.Errorf("fetching proof parameters: %w", err) - } - log.Info("Trying to connect to full node RPC") - fullNode, closer, err := api.GetFullNodeAPIV2(cctx) // TODO: consider storing full node address in config + fullNode, closer, err := api.GetFullNodeFromNodeConfig(ctx, &defaultCfg.Node) // TODO: consider storing full node address in config if err != nil { return err } @@ -282,7 +289,12 @@ var initCmd = &cli.Command{ } } - if err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, ssize, gasPrice); err != nil { + ssize, err := units.RAMInBytes(cctx.String("sector-size")) + if err != nil { + return fmt.Errorf("failed to parse sector size: %w", err) + } + minerAddr, err := storageMinerInit(ctx, cctx, fullNode, messagerClient, defaultCfg, abi.SectorSize(ssize), gasPrice) + if err != nil { log.Errorf("Failed to initialize venus-miner: %+v", err) path, err := homedir.Expand(defaultCfg.DataDir) if err != nil { @@ -295,23 +307,31 @@ var initCmd = &cli.Command{ return xerrors.Errorf("Storage-miner init failed") } + minerInfo, err := fullNode.StateMinerInfo(ctx, minerAddr, types.EmptyTSK) + if err != nil { + return err + } // TODO: Point to setting storage price, maybe do it interactively or something log.Info("Sealer successfully created, you can now start it with 'venus-sealer run'") - + if err := paramfetch.GetParams(ctx, ps, srs, uint64(minerInfo.SectorSize)); err != nil { + return xerrors.Errorf("fetching proof parameters: %w", err) + } return nil }, } -func setAuthToken(cctx *cli.Context) { +func setAuthToken(cfg *config.StorageMiner, cctx *cli.Context) { if cctx.IsSet("auth-token") { authToken := cctx.String("auth-token") - _ = cctx.Set("node-token", authToken) - _ = cctx.Set("messager-token", authToken) - _ = cctx.Set("gateway-token", authToken) + cfg.Node.Token = authToken + cfg.Messager.Token = authToken + cfg.RegisterProof.Token = authToken + cfg.RegisterMarket.Token = authToken + cfg.RegisterMarket.Token = authToken } } -func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) { +func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) error { cfg.DataDir = cctx.String("repo") if cctx.IsSet("messager-url") { @@ -326,6 +346,11 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) { cfg.RegisterProof.Urls = cctx.StringSlice("gateway-url") } + if cctx.IsSet("market-url") { + cfg.RegisterMarket.Urls = []string{cctx.String("market-url")} + cfg.Market.Url = cctx.String("market-url") + } + if cctx.IsSet("node-token") { cfg.Node.Token = cctx.String("node-token") } @@ -337,6 +362,21 @@ func parseFlag(cfg *config.StorageMiner, cctx *cli.Context) { if cctx.IsSet("gateway-token") { cfg.RegisterProof.Token = cctx.String("gateway-token") } + + if cctx.IsSet("market-token") { + cfg.Market.Token = cctx.String("market-token") + cfg.RegisterMarket.Token = cctx.String("market-token") + } + + if cctx.IsSet("piecestorage") { + pieceStorage, err := piecestorage.ParserProtocol(cctx.String("piecestorage")) + if err != nil { + return err + } + + cfg.PieceStorage = pieceStorage + } + return nil } func parseMultiAddr(url string) error { @@ -367,68 +407,68 @@ func checkURL(cfg *config.StorageMiner) error { return nil } -func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) error { +func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, messagerClient api.IMessager, cfg *config.StorageMiner, ssize abi.SectorSize, gasPrice types.BigInt) (address.Address, error) { log.Info("Initializing libp2p identity") repo, err := models.SetDataBase(config.HomeDir(cfg.DataDir), &cfg.DB) if err != nil { - return err + return address.Undef, err } err = repo.AutoMigrate() if err != nil { - return err + return address.Undef, err } metaDataService := service.NewMetadataService(repo) sectorInfoService := service.NewSectorInfoService(repo) p2pSk, _, err := crypto.GenerateEd25519Key(rand.Reader) if err != nil { - return xerrors.Errorf("make host key: %w", err) + return address.Undef, xerrors.Errorf("make host key: %w", err) } peerid, err := peer.IDFromPrivateKey(p2pSk) if err != nil { - return xerrors.Errorf("peer ID from private key: %w", err) + return address.Undef, xerrors.Errorf("peer ID from private key: %w", err) } var addr address.Address if act := cctx.String("actor"); act != "" { a, err := address.NewFromString(act) if err != nil { - return xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err) + return address.Undef, xerrors.Errorf("failed parsing actor flag value (%q): %w", act, err) } if cctx.Bool("genesis-miner") { if err := metaDataService.SaveMinerAddress(a); err != nil { - return err + return address.Undef, err } if pssb := cctx.String("pre-sealed-metadata"); pssb != "" { pssb, err := homedir.Expand(pssb) if err != nil { - return err + return address.Undef, err } log.Infof("Importing pre-sealed sector metadata for %s", a) if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil { - return xerrors.Errorf("migrating presealed sector metadata: %w", err) + return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err) } } - return nil + return a, nil } if pssb := cctx.String("pre-sealed-metadata"); pssb != "" { pssb, err := homedir.Expand(pssb) if err != nil { - return err + return address.Undef, err } log.Infof("Importing pre-sealed sector metadata for %s", a) if err := migratePreSealMeta(ctx, api, pssb, a, metaDataService, sectorInfoService); err != nil { - return xerrors.Errorf("migrating presealed sector metadata: %w", err) + return address.Undef, xerrors.Errorf("migrating presealed sector metadata: %w", err) } } @@ -436,7 +476,7 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, } else { a, err := createStorageMiner(ctx, api, messagerClient, peerid, gasPrice, cctx) if err != nil { - return xerrors.Errorf("creating miner failed: %w", err) + return address.Undef, xerrors.Errorf("creating miner failed: %w", err) } addr = a @@ -445,10 +485,10 @@ func storageMinerInit(ctx context.Context, cctx *cli.Context, api api.FullNode, log.Infof("Created new miner: %s", addr) if err := metaDataService.SaveMinerAddress(addr); err != nil { - return err + return address.Undef, err } - return nil + return addr, nil } func createStorageMiner(ctx context.Context, nodeAPI api.FullNode, messagerClient api.IMessager, peerid peer.ID, gasPrice types.BigInt, cctx *cli.Context) (address.Address, error) { diff --git a/app/venus-sealer/util.go b/app/venus-sealer/util.go index 82204435..b1e378ed 100644 --- a/app/venus-sealer/util.go +++ b/app/venus-sealer/util.go @@ -88,6 +88,7 @@ var stateList = []stateMeta{ {col: color.FgBlue, state: types.Empty}, {col: color.FgBlue, state: types.WaitDeals}, + {col: color.FgBlue, state: types.AddPiece}, {col: color.FgRed, state: types.UndefinedSectorState}, {col: color.FgYellow, state: types.Packing}, @@ -96,10 +97,15 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: types.PreCommit2}, {col: color.FgYellow, state: types.PreCommitting}, {col: color.FgYellow, state: types.PreCommitWait}, + {col: color.FgYellow, state: types.SubmitPreCommitBatch}, + {col: color.FgYellow, state: types.PreCommitBatchWait}, {col: color.FgYellow, state: types.WaitSeed}, {col: color.FgYellow, state: types.Committing}, + {col: color.FgYellow, state: types.CommitFinalize}, {col: color.FgYellow, state: types.SubmitCommit}, {col: color.FgYellow, state: types.CommitWait}, + {col: color.FgYellow, state: types.SubmitCommitAggregate}, + {col: color.FgYellow, state: types.CommitAggregateWait}, {col: color.FgYellow, state: types.FinalizeSector}, {col: color.FgCyan, state: types.Terminating}, @@ -110,11 +116,13 @@ var stateList = []stateMeta{ {col: color.FgCyan, state: types.Removed}, {col: color.FgRed, state: types.FailedUnrecoverable}, + {col: color.FgRed, state: types.AddPieceFailed}, {col: color.FgRed, state: types.SealPreCommit1Failed}, {col: color.FgRed, state: types.SealPreCommit2Failed}, {col: color.FgRed, state: types.PreCommitFailed}, {col: color.FgRed, state: types.ComputeProofFailed}, {col: color.FgRed, state: types.CommitFailed}, + {col: color.FgRed, state: types.CommitFinalizeFailed}, {col: color.FgRed, state: types.PackingFailed}, {col: color.FgRed, state: types.FinalizeFailed}, {col: color.FgRed, state: types.Faulty}, diff --git a/builder.go b/builder.go index b4cbacc0..f08bd714 100644 --- a/builder.go +++ b/builder.go @@ -5,6 +5,8 @@ import ( "github.com/filecoin-project/go-state-types/abi" storage2 "github.com/filecoin-project/specs-storage/storage" api2 "github.com/filecoin-project/venus-market/api" + config2 "github.com/filecoin-project/venus-market/config" + "github.com/filecoin-project/venus-market/piecestorage" "github.com/filecoin-project/venus-sealer/api" "github.com/filecoin-project/venus-sealer/api/impl" "github.com/filecoin-project/venus-sealer/config" @@ -167,6 +169,7 @@ func Repo(cfg *config.StorageMiner) Option { Override(new(sectorstorage.SealerConfig), cfg.Storage), Override(new(*storage.AddressSelector), AddressSelector(&cfg.Addresses)), Override(new(*config.DbConfig), &cfg.DB), + Override(new(*config2.PieceStorage), &cfg.PieceStorage), Override(new(*config.StorageMiner), cfg), Override(new(*config.MessagerConfig), &cfg.Messager), Override(new(*config.MarketConfig), &cfg.Market), @@ -176,6 +179,7 @@ func Repo(cfg *config.StorageMiner) Option { Override(new(api.IMessager), api.NewMessageRPC), Override(new(api2.MarketFullNode), api.NewMarketRPC), + Override(new(piecestorage.IPieceStorage), NewPieceStorage), Override(new(*market_client.MarketEventClient), market_client.NewMarketEventClient), Override(new(*proof_client.ProofEventClient), proof_client.NewProofEventClient), Override(new(repo.Repo), models.SetDataBase), diff --git a/config/def.go b/config/def.go index 38fb2c3c..e2147c5e 100644 --- a/config/def.go +++ b/config/def.go @@ -2,6 +2,7 @@ package config import ( "encoding" + "github.com/filecoin-project/venus-market/config" "net/http" "strings" "time" @@ -25,7 +26,6 @@ const ( RetrievalPricingExternalMode = "external" ) - type HomeDir string type StorageWorker struct { @@ -41,19 +41,21 @@ func (cfg StorageWorker) LocalStorage() *LocalStorage { // StorageMiner is a miner config type StorageMiner struct { - DataDir string - API API - Dealmaking DealmakingConfig - Sealing SealingConfig - Storage sectorstorage.SealerConfig - Fees MinerFeeConfig - Addresses MinerAddressConfig - NetParams NetParamsConfig - DB DbConfig - Node NodeConfig - JWT JWTConfig - Messager MessagerConfig + DataDir string + API API + Dealmaking DealmakingConfig + Sealing SealingConfig + Storage sectorstorage.SealerConfig + Fees MinerFeeConfig + Addresses MinerAddressConfig + NetParams NetParamsConfig + DB DbConfig + Node NodeConfig + JWT JWTConfig + Messager MessagerConfig + Market MarketConfig + PieceStorage config.PieceStorage RegisterProof RegisterProofConfig RegisterMarket RegisterMarketConfig diff --git a/config/local_storage.go b/config/local_storage.go index 85b43b83..6ec836fa 100644 --- a/config/local_storage.go +++ b/config/local_storage.go @@ -99,7 +99,7 @@ func (fsr *LocalStorage) SetConfig(c func(interface{})) error { fsr.configLk.Lock() defer fsr.configLk.Unlock() - cfg, err := MinerFromFile(fsr.configPath) + cfg, err := MinerFromFile(FsConfig(fsr.configPath)) if err != nil { return err } @@ -117,7 +117,7 @@ func (fsr *LocalStorage) SetConfig(c func(interface{})) error { } // write buffer of TOML bytes to config file - err = ioutil.WriteFile(fsr.configPath, buf.Bytes(), 0644) + err = ioutil.WriteFile(FsConfig(fsr.configPath), buf.Bytes(), 0644) if err != nil { return err } diff --git a/go.mod b/go.mod index 444c707b..8aeb57da 100644 --- a/go.mod +++ b/go.mod @@ -33,8 +33,8 @@ require ( github.com/filecoin-project/specs-actors/v6 v6.0.1 github.com/filecoin-project/specs-storage v0.1.1-0.20201105051918-5188d9774506 github.com/filecoin-project/venus v1.1.3-rc1 - github.com/filecoin-project/venus-market v1.0.2-0.20211116071853-d29d45417e0c - github.com/filecoin-project/venus-messager v1.2.0 + github.com/filecoin-project/venus-market v1.0.2-0.20211206084715-070874dc17eb + github.com/filecoin-project/venus-messager v1.2.2-rc1.0.20211201075617-c9dd295b905c github.com/gbrlsnchs/jwt/v3 v3.0.0 github.com/golang/mock v1.6.0 github.com/google/uuid v1.2.0 @@ -42,8 +42,8 @@ require ( github.com/hako/durafmt v0.0.0-20200710122514-c0fb7b4da026 github.com/hashicorp/go-multierror v1.1.1 github.com/icza/backscanner v0.0.0-20210726202459-ac2ffc679f94 - github.com/ipfs-force-community/venus-common-utils v0.0.0-20210924063144-1d3a5b30de87 - github.com/ipfs-force-community/venus-gateway v1.1.2-0.20210924083450-c55d3300dfc9 + github.com/ipfs-force-community/venus-common-utils v0.0.0-20211122032945-eb6cab79c62a + github.com/ipfs-force-community/venus-gateway v1.1.2-0.20211124052117-425c4a895f4a github.com/ipfs/go-block-format v0.0.3 github.com/ipfs/go-cid v0.1.0 github.com/ipfs/go-datastore v0.4.6 @@ -71,10 +71,10 @@ require ( github.com/whyrusleeping/cbor-gen v0.0.0-20210713220151-be142a5ae1a8 github.com/zbiljic/go-filelock v0.0.0-20170914061330-1dbf7103ab7d go.opencensus.io v0.23.0 - go.uber.org/fx v1.13.1 + go.uber.org/fx v1.15.0 go.uber.org/multierr v1.7.0 go.uber.org/zap v1.16.0 - golang.org/x/net v0.0.0-20210525063256-abc453219eb5 + golang.org/x/net v0.0.0-20210614182718-04defd469f4e golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6 golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 gorm.io/driver/mysql v1.1.1 diff --git a/market_client/market_event.go b/market_client/market_event.go index 60105f35..472d81b5 100644 --- a/market_client/market_event.go +++ b/market_client/market_event.go @@ -2,6 +2,8 @@ package market_client import ( "context" + "github.com/filecoin-project/venus-market/piecestorage" + "github.com/filecoin-project/venus-sealer/sector-storage/fr32" "encoding/json" "time" @@ -35,6 +37,7 @@ type MarketEvent struct { sectorBlocks *sectorblocks.SectorBlocks storageMgr *sectorstorage.Manager index stores.SectorIndex + pieceStorage piecestorage.IPieceStorage } func (e *MarketEvent) listenMarketRequest(ctx context.Context) { @@ -152,7 +155,7 @@ func (e *MarketEvent) processSectorUnsealed(ctx context.Context, reqId uuid.UUID return } - _, err = piecestorage.ReWrite(req.Dest, upr) + _, err = e.pieceStorage.SaveTo(ctx, req.Dest, upr) if err != nil { e.error(ctx, reqId, err) return diff --git a/market_client/modules.go b/market_client/modules.go index 755d7a6c..de510e29 100644 --- a/market_client/modules.go +++ b/market_client/modules.go @@ -2,6 +2,7 @@ package market_client import ( "context" + "github.com/filecoin-project/venus-market/piecestorage" "github.com/filecoin-project/venus-sealer/config" sectorstorage "github.com/filecoin-project/venus-sealer/sector-storage" "github.com/filecoin-project/venus-sealer/sector-storage/stores" @@ -10,7 +11,7 @@ import ( "go.uber.org/fx" ) -func StartMarketEvent(lc fx.Lifecycle, stor *stores.Remote, sectorBlocks *sectorblocks.SectorBlocks, storageMgr *sectorstorage.Manager, index stores.SectorIndex, mCfg *config.MarketConfig, cfg *config.RegisterMarketConfig, mAddr types2.MinerAddress) error { +func StartMarketEvent(lc fx.Lifecycle, stor *stores.Remote, pieceStorage piecestorage.IPieceStorage, sectorBlocks *sectorblocks.SectorBlocks, storageMgr *sectorstorage.Manager, index stores.SectorIndex, mCfg *config.MarketConfig, cfg *config.RegisterMarketConfig, mAddr types2.MinerAddress) error { if len(cfg.Urls) == 0 && mCfg.Url == "" { log.Warnf("register market config is empty ...") return nil @@ -32,6 +33,7 @@ func StartMarketEvent(lc fx.Lifecycle, stor *stores.Remote, sectorBlocks *sector sectorBlocks: sectorBlocks, storageMgr: storageMgr, index: index, + pieceStorage: pieceStorage, } go marketEvent.listenMarketRequest(context.Background()) } diff --git a/modules.go b/modules.go index 2243e48f..9c937c8d 100644 --- a/modules.go +++ b/modules.go @@ -5,6 +5,8 @@ import ( "crypto/rand" "encoding/hex" "errors" + config2 "github.com/filecoin-project/venus-market/config" + "github.com/filecoin-project/venus-market/piecestorage" "math" "net/http" "time" @@ -240,6 +242,10 @@ func AddressSelector(addrConf *config.MinerAddressConfig) func() (*storage.Addre } } +func NewPieceStorage(cfg *config2.PieceStorage) (piecestorage.IPieceStorage, error) { + return piecestorage.NewPieceStorage(cfg) +} + type StorageMinerParams struct { fx.In @@ -259,6 +265,7 @@ type StorageMinerParams struct { Journal journal.Journal AddrSel *storage.AddressSelector NetworkParams *config.NetParamsConfig + PieceStorage piecestorage.IPieceStorage } func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*storage.Miner, error) { @@ -280,6 +287,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st j = params.Journal as = params.AddrSel np = params.NetworkParams + ps = params.PieceStorage ) maddr, err := metadataService.GetMinerAddress() @@ -294,7 +302,7 @@ func StorageMiner(fc config.MinerFeeConfig) func(params StorageMinerParams) (*st return nil, err } - sm, err := storage.NewMiner(api, messager, marketClient, maddr, metadataService, sectorinfoService, logService, sealer, sc, verif, prover, gsd, fc, j, as, np) + sm, err := storage.NewMiner(api, ps, messager, marketClient, maddr, metadataService, sectorinfoService, logService, sealer, sc, verif, prover, gsd, fc, j, as, np) if err != nil { return nil, err } diff --git a/sector-storage/ffiwrapper/sealer_cgo.go b/sector-storage/ffiwrapper/sealer_cgo.go index 1f39eb45..c30c868d 100644 --- a/sector-storage/ffiwrapper/sealer_cgo.go +++ b/sector-storage/ffiwrapper/sealer_cgo.go @@ -513,6 +513,7 @@ func (sb *Sealer) SealPreCommit1(ctx context.Context, sector storage.SectorRef, log.Infof("pre commit1 paths: %v", paths) if bExist, _ := storiface.FileExists(tUnsealedFile); bExist { if bExist, _ := storiface.FileExists(paths.Unsealed); !bExist { + log.Infof("copy default unsealed file for sector %v", sector.ID.Number) err = storiface.CopyFile(tUnsealedFile, paths.Unsealed) if err != nil { return nil, err diff --git a/storage-sealing/deals.go b/storage-sealing/deals.go index a56a33ff..7d09e047 100644 --- a/storage-sealing/deals.go +++ b/storage-sealing/deals.go @@ -18,6 +18,7 @@ func (m *Sealing) DealSector(ctx context.Context) ([]types.DealAssign, error) { } log.Infof("got %d deals from venus-market", len(deals)) //read from file + var assigned []types.DealAssign for _, deal := range deals { r, err := piecestorage.Read(deal.PieceStorage) diff --git a/storage-sealing/garbage.go b/storage-sealing/garbage.go index 5f993c74..598fcc4c 100644 --- a/storage-sealing/garbage.go +++ b/storage-sealing/garbage.go @@ -67,7 +67,7 @@ func (m *Sealing) RedoSector(ctx context.Context, rsi storiface.SectorRedoParams go func() { // P1 - if err := checkPieces(ctx, m.maddr, si, m.api); err != nil { // Sanity check state + if err := checkPieces(context.TODO(), m.maddr, si, m.api); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error in sector %d, not proceeding: %+v", si.SectorNumber, err) diff --git a/storage-sealing/sealing.go b/storage-sealing/sealing.go index e5904469..ce3d711a 100644 --- a/storage-sealing/sealing.go +++ b/storage-sealing/sealing.go @@ -3,6 +3,7 @@ package sealing import ( "context" "errors" + types3 "github.com/filecoin-project/venus-market/types" "sync" "time" @@ -33,6 +34,7 @@ import ( types2 "github.com/filecoin-project/venus-sealer/types" types3 "github.com/filecoin-project/venus-market/types" + "github.com/filecoin-project/venus-market/piecestorage" ) var log = logging.Logger("sectors") @@ -126,8 +128,8 @@ type Sealing struct { precommiter *PreCommitBatcher commiter *CommitBatcher - getConfig types2.GetSealingConfigFunc - + getConfig types2.GetSealingConfigFunc + pieceStorage piecestorage.IPieceStorage //service logService *service.LogService } @@ -148,13 +150,14 @@ type pendingPiece struct { accepted func(abi.SectorNumber, abi.UnpaddedPieceSize, error) } -func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, metaDataService *service.MetadataService, sectorInfoService *service.SectorInfoService, logService *service.LogService, sealer sectorstorage.SectorManager, sc types2.SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc types2.GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel, networkParams *config.NetParamsConfig) *Sealing { +func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.Address, metaDataService *service.MetadataService, sectorInfoService *service.SectorInfoService, logService *service.LogService, sealer sectorstorage.SectorManager, sc types2.SectorIDCounter, verif ffiwrapper.Verifier, prov ffiwrapper.Prover, pcp PreCommitPolicy, gc types2.GetSealingConfigFunc, notifee SectorStateNotifee, as AddrSel, networkParams *config.NetParamsConfig, pieceStorage piecestorage.IPieceStorage) *Sealing { s := &Sealing{ api: api, DealInfo: &CurrentDealInfoManager{api}, feeCfg: fc, events: events, + pieceStorage: pieceStorage, networkParams: networkParams, maddr: maddr, sealer: sealer, diff --git a/storage-sealing/states_sealing.go b/storage-sealing/states_sealing.go index cc594196..cb2edb0b 100644 --- a/storage-sealing/states_sealing.go +++ b/storage-sealing/states_sealing.go @@ -77,7 +77,19 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector types.SectorInf log.Warnf("Creating %d filler pieces for sector %d", len(fillerSizes), sector.SectorNumber) } - fillerPieces, err := m.padSector(sector.SealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.ExistingPieceSizes(), fillerSizes...) + // Check if the sector is deal + var deals int = 0 + for _, piece := range sector.Pieces { + if piece.DealInfo == nil { + continue + } + + if piece.DealInfo.DealID > 0 { + deals++ + } + } + + fillerPieces, err := m.padSector(sector.SealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), deals>0, sector.ExistingPieceSizes(), fillerSizes...) if err != nil { return xerrors.Errorf("filling up the sector (%v): %w", fillerSizes, err) } @@ -85,7 +97,7 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector types.SectorInf return ctx.Send(SectorPacked{FillerPieces: fillerPieces}) } -func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { +func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, bDeal bool, existingPieceSizes []abi.UnpaddedPieceSize, sizes ...abi.UnpaddedPieceSize) ([]abi.PieceInfo, error) { if len(sizes) == 0 { return nil, nil } @@ -98,9 +110,9 @@ func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, exi } pisFile := storiface.DefaultPieceInfosFile(ssize) - if len(existingPieceSizes) == 0 { + if len(existingPieceSizes) == 0 && !bDeal { //hack for cc sector - log.Infof("pisFile: %s", pisFile) + log.Infof("use default pisFile for sector %v: %s", sectorID.ID.Number, pisFile) if bExist, _ := storiface.FileExists(pisFile); bExist { bufs, err := ioutil.ReadFile(pisFile) if err != nil { @@ -129,18 +141,20 @@ func (m *Sealing) padSector(ctx context.Context, sectorID storage.SectorRef, exi out[i] = ppi } - psFile := storiface.DefaultUnsealedFile(ssize) - if bExist, _ := storiface.FileExists(psFile); bExist { - //hack for cc sector - // save piece info to /var/tmp/s-piece-infos- - buf, err := json.Marshal(out) - if err != nil { - return nil, err - } + if !bDeal { + psFile := storiface.DefaultUnsealedFile(ssize) + if bExist, _ := storiface.FileExists(psFile); bExist { + //hack for cc sector + // save piece info to /var/tmp/s-piece-infos- + buf, err := json.Marshal(out) + if err != nil { + return nil, err + } - log.Infof("default pieceInfo file: %s, buf: %s", pisFile, buf) - if err := ioutil.WriteFile(pisFile, buf, 0644); err != nil { - return nil, xerrors.Errorf("persisting (%s): %w", pisFile, err) + log.Infof("default pieceInfo file: %s, buf: %s", pisFile, buf) + if err := ioutil.WriteFile(pisFile, buf, 0644); err != nil { + return nil, xerrors.Errorf("persisting (%s): %w", pisFile, err) + } } } diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 6b6f09fe..54843329 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -3,6 +3,8 @@ package storage import ( "bytes" "context" + api2 "github.com/filecoin-project/venus-market/api" + types4 "github.com/filecoin-project/venus-market/types" "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" diff --git a/storage/miner.go b/storage/miner.go index 10a822da..d23a0239 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -5,6 +5,7 @@ import ( "errors" fbig "github.com/filecoin-project/go-state-types/big" api2 "github.com/filecoin-project/venus-market/api" + "github.com/filecoin-project/venus-market/piecestorage" "time" "github.com/ipfs/go-cid" @@ -51,6 +52,7 @@ var log = logging.Logger("storageminer") // // Miner#Run starts the sealing FSM. type Miner struct { + pieceStorage piecestorage.IPieceStorage messager api.IMessager marketClient api2.MarketFullNode metadataService *service.MetadataService @@ -140,6 +142,7 @@ type fullNodeFilteredAPI interface { } func NewMiner(api fullNodeFilteredAPI, + pieceStorage piecestorage.IPieceStorage, messager api.IMessager, marketClient api2.MarketFullNode, maddr address.Address, @@ -172,6 +175,7 @@ func NewMiner(api fullNodeFilteredAPI, getSealConfig: gsd, journal: journal, logService: logService, + pieceStorage: pieceStorage, sealingEvtType: journal.RegisterEventType("storage", "sealing_states"), } @@ -223,7 +227,7 @@ func (m *Miner) Run(ctx context.Context) error { // Instantiate the sealing FSM. m.sealing = sealing.New(ctx, adaptedAPI, m.feeCfg, evtsAdapter, m.maddr, m.metadataService, m.sectorInfoService, m.logService, m.sealer, m.sc, m.verif, m.prover, - &pcp, cfg, m.handleSealingNotifications, as, m.networkParams) + &pcp, cfg, m.handleSealingNotifications, as, m.networkParams, m.pieceStorage) // Run the sealing FSM. go m.sealing.Run(ctx) //nolint:errcheck // logged intside the function