Skip to content

Commit

Permalink
Merge pull request #317 from filecoin-project/opt/import-data
Browse files Browse the repository at this point in the history
Opt/import data
  • Loading branch information
hunjixin authored Apr 13, 2023
2 parents a478fb0 + b359c3c commit 3a26f5d
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 72 deletions.
6 changes: 3 additions & 3 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *MarketNodeImpl) MarketImportDealData(ctx context.Context, propCid cid.C
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi)
return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi, true)
}

func (m *MarketNodeImpl) MarketImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error {
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add
return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus)
}

func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string) error {
func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string, skipCommP bool) error {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid)
if err != nil {
return err
Expand All @@ -1106,7 +1106,7 @@ func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Ci
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi)
return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi, skipCommP)
}

func (m *MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*types.DealInfo, error) {
Expand Down
24 changes: 21 additions & 3 deletions cli/storage-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ var storageDealsCmds = &cli.Command{
}

var dealsImportDataCmd = &cli.Command{
Name: "import-data",
Usage: "Manually import data for a deal",
Name: "import-data",
Usage: "Manually import data for a deal",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "skip-commp",
Usage: "skip calculate the piece-cid, please use with caution",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "Actually send transaction performing the action",
},
},
ArgsUsage: "<proposal CID> <file>",
Action: func(cctx *cli.Context) error {
api, closer, err := NewMarketNode(cctx)
Expand All @@ -75,7 +85,15 @@ var dealsImportDataCmd = &cli.Command{

fpath := cctx.Args().Get(1)

return api.DealsImportData(ctx, propCid, fpath)
var skipCommP bool
if cctx.IsSet("skip-commp") {
if !cctx.IsSet("really-do-it") {
return fmt.Errorf("pass --really-do-it to actually execute this action")
}
skipCommP = true
}

return api.DealsImportData(ctx, propCid, fpath, skipCommP)
},
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1
github.com/filecoin-project/venus v1.10.2-0.20230411091426-99fd1cec96ea
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1 h1:wOzhaoLA4AmfHa2ynT/KMf5GM53SSN41X8n0T+alVNw=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus v1.10.2-0.20230411091426-99fd1cec96ea h1:gp2B2xxGyM6G5FiYfcDL3vQyWtKeETOVm0afCZe5O68=
github.com/filecoin-project/venus v1.10.2-0.20230411091426-99fd1cec96ea/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
Expand Down
26 changes: 23 additions & 3 deletions storageprovider/deal_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
deal.PayloadSize = uint64(file.Size())
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database"))
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
err = storageDealPorcess.savePieceFile(ctx, deal, file, uint64(file.Size()))
if err := file.Close(); err != nil {
Expand All @@ -476,7 +476,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
err = fmt.Errorf("packing piece at path %s: %w", deal.PiecePath, err)
return storageDealPorcess.HandleError(ctx, deal, err)
}
} else {
} else if len(deal.InboundCAR) != 0 {
carFilePath = deal.InboundCAR

v2r, err := storageDealPorcess.ReadCAR(deal.InboundCAR)
Expand All @@ -488,7 +488,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
deal.PayloadSize = v2r.Header.DataSize
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database"))
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
dr, err := v2r.DataReader()
if err != nil {
Expand All @@ -507,6 +507,26 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
err = fmt.Errorf("packing piece %s: %w", deal.Ref.PieceCid, packingErr)
return storageDealPorcess.HandleError(ctx, deal, err)
}
} else {
// An index can be created even if carFilePath is empty
carFilePath = ""
// carfile may already in piece storage, verify it
pieceStore, err := storageDealPorcess.pieceStorageMgr.FindStorageForRead(ctx, deal.Proposal.PieceCID.String())
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, err)
}
log.Debugf("found %s in piece storage", deal.Proposal.PieceCID)

l, err := pieceStore.Len(ctx, deal.Proposal.PieceCID.String())
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to got payload size: %v", err))
}

deal.PayloadSize = uint64(l)
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
}

// Register the deal data as a "shard" with the DAG store. Later it can be
Expand Down
159 changes: 99 additions & 60 deletions storageprovider/storage_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type StorageProvider interface {
GetStorageCollateral(ctx context.Context, mAddr address.Address) (storagemarket.Balance, error)

// ImportDataForDeal manually imports data for an offline storage deal
ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error
ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error

// ImportPublishedDeal manually import published deals to storage deals
ImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error
Expand Down Expand Up @@ -140,6 +140,7 @@ type StorageProviderImpl struct {
transferProcess IDatatransferHandler
storageReceiver smnet.StorageReceiver
minerMgr minermgr.IMinerMgr
pieceStorageMgr *piecestorage.PieceStorageManager
}

// NewStorageProvider returns a new storage provider
Expand Down Expand Up @@ -173,7 +174,8 @@ func NewStorageProvider(

dealStore: repo.StorageDealRepo(),

minerMgr: minerMgr,
minerMgr: minerMgr,
pieceStorageMgr: pieceStorageMgr,
}

dealProcess, err := NewStorageDealProcessImpl(mCtx, spV2.conns, newPeerTagger(spV2.net), spV2.spn, spV2.dealStore, spV2.storedAsk, tf, minerMgr, pieceStorageMgr, dataTransfer, dagStore, sdf, pb)
Expand Down Expand Up @@ -262,7 +264,9 @@ func (p *StorageProviderImpl) Stop() error {
// ImportDataForDeal manually imports data for an offline storage deal
// It will verify that the data in the passed io.Reader matches the expected piece
// cid for the given deal or it will error
func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error {
// If can find the car file from the piece store, read it directly without copying the car file to the local directory.
// If skipCommP is true, do not compare piece cid.
func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error {
// TODO: be able to check if we have enough disk space
d, err := p.dealStore.GetDeal(ctx, propCid)
if err != nil {
Expand All @@ -277,81 +281,116 @@ func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid
return fmt.Errorf("deal %s does not support offline data", propCid)
}

fs, err := p.tf(d.Proposal.Provider)
if err != nil {
return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err)
}
var r io.Reader
var carSize int64
var piecePath filestore.Path
var cleanup = func() {}

tempfi, err := fs.CreateTemp()
if err != nil {
return fmt.Errorf("failed to create temp file for data import: %w", err)
}
defer func() {
if err := tempfi.Close(); err != nil {
log.Errorf("unable to close stream %v", err)
pieceStore, err := p.pieceStorageMgr.FindStorageForRead(ctx, d.Proposal.PieceCID.String())
if err == nil {
log.Debugf("found %v already in piece storage", d.Proposal.PieceCID)

// In order to avoid errors in the deal, the files in the piece store were deleted.
piecePath = filestore.Path("")
if carSize, err = pieceStore.Len(ctx, d.Proposal.PieceCID.String()); err != nil {
return fmt.Errorf("got piece size from piece store failed: %v", err)
}
}()
cleanup := func() {
_ = tempfi.Close()
_ = fs.Delete(tempfi.Path())
}
readerCloser, err := pieceStore.GetReaderCloser(ctx, d.Proposal.PieceCID.String())
if err != nil {
return fmt.Errorf("got reader from piece store failed: %v", err)
}
r = readerCloser

log.Debugw("will copy imported file to local file", "propCid", propCid)
n, err := io.Copy(tempfi, data)
if err != nil {
cleanup()
return fmt.Errorf("importing deal data failed: %w", err)
}
log.Debugw("finished copying imported file to local file", "propCid", propCid)
defer func() {
if err = readerCloser.Close(); err != nil {
log.Errorf("unable to close piece storage: %v, %v", d.Proposal.PieceCID, err)
}
}()
} else {
log.Debugf("not found %s in piece storage", d.Proposal.PieceCID)

_ = n // TODO: verify n?
fs, err := p.tf(d.Proposal.Provider)
if err != nil {
return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err)
}

carSize := uint64(tempfi.Size())
tempfi, err := fs.CreateTemp()
if err != nil {
return fmt.Errorf("failed to create temp file for data import: %w", err)
}
defer func() {
if err := tempfi.Close(); err != nil {
log.Errorf("unable to close stream %v", err)
}
}()
cleanup = func() {
_ = tempfi.Close()
_ = fs.Delete(tempfi.Path())
}

_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return fmt.Errorf("failed to seek through temp imported file: %w", err)
}
log.Debugw("will copy imported file to local file", "propCid", propCid)
n, err := io.Copy(tempfi, data)
if err != nil {
cleanup()
return fmt.Errorf("importing deal data failed: %w", err)
}
log.Debugw("finished copying imported file to local file", "propCid", propCid)

proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池?
if err != nil {
p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d)
cleanup()
return fmt.Errorf("failed to determine proof type: %w", err)
}
log.Debugw("fetched proof type", "propCid", propCid)
_ = n // TODO: verify n?

pieceCid, err := utils.GeneratePieceCommitment(proofType, tempfi, carSize)
if err != nil {
cleanup()
return fmt.Errorf("failed to generate commP: %w", err)
}
if carSizePadded := padreader.PaddedSize(carSize).Padded(); carSizePadded < d.Proposal.PieceSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pieceCid.Hash()[len(pieceCid.Hash())-32:],
uint64(carSizePadded),
uint64(d.Proposal.PieceSize),
)
carSize = tempfi.Size()
piecePath = tempfi.Path()
_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return err
return fmt.Errorf("failed to seek through temp imported file: %w", err)
}
pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)

r = tempfi
}

// Verify CommP matches
if !pieceCid.Equals(d.Proposal.PieceCID) {
cleanup()
return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID)
if !skipCommP {
log.Debugf("will calculate piece cid")

proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池?
if err != nil {
p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d)
cleanup()
return fmt.Errorf("failed to determine proof type: %w", err)
}
log.Debugw("fetched proof type", "propCid", propCid)

pieceCid, err := utils.GeneratePieceCommitment(proofType, r, uint64(carSize))
if err != nil {
cleanup()
return fmt.Errorf("failed to generate commP: %w", err)
}
if carSizePadded := padreader.PaddedSize(uint64(carSize)).Padded(); carSizePadded < d.Proposal.PieceSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pieceCid.Hash()[len(pieceCid.Hash())-32:],
uint64(carSizePadded),
uint64(d.Proposal.PieceSize),
)
if err != nil {
cleanup()
return err
}
pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

// Verify CommP matches
if !pieceCid.Equals(d.Proposal.PieceCID) {
cleanup()
return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID)
}
}

log.Debugw("will fire ReserveProviderFunds for imported file", "propCid", propCid)

// "will fire VerifiedData for imported file
d.PiecePath = tempfi.Path()
d.PiecePath = piecePath
d.MetadataPath = filestore.Path("")
log.Infof("deal %s piece path: %s", propCid, d.PiecePath)

Expand Down

0 comments on commit 3a26f5d

Please sign in to comment.