Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Opt/import data #317

Merged
merged 4 commits into from
Apr 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions api/impl/venus_market.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (m *MarketNodeImpl) MarketImportDealData(ctx context.Context, propCid cid.C
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi)
return m.StorageProvider.ImportDataForDeal(ctx, propCid, fi, true)
}

func (m *MarketNodeImpl) MarketImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error {
Expand Down Expand Up @@ -1092,7 +1092,7 @@ func (m *MarketNodeImpl) UpdateDealStatus(ctx context.Context, miner address.Add
return m.DealAssigner.UpdateDealStatus(ctx, miner, dealId, pieceStatus, dealStatus)
}

func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string) error {
func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Cid, fname string, skipCommP bool) error {
deal, err := m.Repo.StorageDealRepo().GetDeal(ctx, dealPropCid)
if err != nil {
return err
Expand All @@ -1106,7 +1106,7 @@ func (m *MarketNodeImpl) DealsImportData(ctx context.Context, dealPropCid cid.Ci
}
defer fi.Close() //nolint:errcheck

return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi)
return m.StorageProvider.ImportDataForDeal(ctx, dealPropCid, fi, skipCommP)
}

func (m *MarketNodeImpl) GetDeals(ctx context.Context, miner address.Address, pageIndex, pageSize int) ([]*types.DealInfo, error) {
Expand Down
24 changes: 21 additions & 3 deletions cli/storage-deals.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ var storageDealsCmds = &cli.Command{
}

var dealsImportDataCmd = &cli.Command{
Name: "import-data",
Usage: "Manually import data for a deal",
Name: "import-data",
Usage: "Manually import data for a deal",
Flags: []cli.Flag{
&cli.BoolFlag{
Name: "skip-commp",
Usage: "skip calculate the piece-cid, please use with caution",
},
&cli.BoolFlag{
Name: "really-do-it",
Usage: "Actually send transaction performing the action",
},
},
ArgsUsage: "<proposal CID> <file>",
Action: func(cctx *cli.Context) error {
api, closer, err := NewMarketNode(cctx)
Expand All @@ -75,7 +85,15 @@ var dealsImportDataCmd = &cli.Command{

fpath := cctx.Args().Get(1)

return api.DealsImportData(ctx, propCid, fpath)
var skipCommP bool
if cctx.IsSet("skip-commp") {
if !cctx.IsSet("really-do-it") {
return fmt.Errorf("pass --really-do-it to actually execute this action")
}
skipCommP = true
}

return api.DealsImportData(ctx, propCid, fpath, skipCommP)
},
}

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/filecoin-project/go-statestore v0.2.0
github.com/filecoin-project/specs-actors/v2 v2.3.6
github.com/filecoin-project/specs-actors/v7 v7.0.1
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1
github.com/filecoin-project/venus v1.10.2-0.20230411091426-99fd1cec96ea
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e
github.com/filecoin-project/venus-messager v1.10.2-0.20230309071456-7cd8d49c6e9a
github.com/golang/mock v1.6.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893
github.com/filecoin-project/storetheindex v0.4.30-0.20221114113647-683091f8e893/go.mod h1:S7590oDimBvXMUtzWsBXoshu9HtYKwtXl47zAK9rcP8=
github.com/filecoin-project/test-vectors/schema v0.0.5/go.mod h1:iQ9QXLpYWL3m7warwvK1JC/pTri8mnfEmKygNDqqY6E=
github.com/filecoin-project/venus v1.2.4/go.mod h1:hJULXHGAnWuq5S5KRtPkwbT8DqgM9II7NwyNU7t59D0=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1 h1:wOzhaoLA4AmfHa2ynT/KMf5GM53SSN41X8n0T+alVNw=
github.com/filecoin-project/venus v1.10.2-0.20230330090548-2e3f39feceb1/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus v1.10.2-0.20230411091426-99fd1cec96ea h1:gp2B2xxGyM6G5FiYfcDL3vQyWtKeETOVm0afCZe5O68=
github.com/filecoin-project/venus v1.10.2-0.20230411091426-99fd1cec96ea/go.mod h1:eCV4+qsHdDg7FXB8xLn5w/ay+Uu5pG3oAlPsB1nb6qU=
github.com/filecoin-project/venus-auth v1.3.2/go.mod h1:m5Jog2GYxztwP7w3m/iJdv/V1/bTcAVU9rm/CbhxRQU=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e h1:Bxpt1AzPeNxmUnFT2Y8rpabr9x0wIC0Q87DeRmjL2co=
github.com/filecoin-project/venus-auth v1.10.2-0.20230308100319-913815325d5e/go.mod h1:aBfIfNxQkdcY8Rk5wrQn9qRtJpH4RTDdc10Ac+ferzs=
Expand Down
26 changes: 23 additions & 3 deletions storageprovider/deal_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
deal.PayloadSize = uint64(file.Size())
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database"))
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
err = storageDealPorcess.savePieceFile(ctx, deal, file, uint64(file.Size()))
if err := file.Close(); err != nil {
Expand All @@ -476,7 +476,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
err = fmt.Errorf("packing piece at path %s: %w", deal.PiecePath, err)
return storageDealPorcess.HandleError(ctx, deal, err)
}
} else {
} else if len(deal.InboundCAR) != 0 {
carFilePath = deal.InboundCAR

v2r, err := storageDealPorcess.ReadCAR(deal.InboundCAR)
Expand All @@ -488,7 +488,7 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
deal.PayloadSize = v2r.Header.DataSize
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database"))
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
dr, err := v2r.DataReader()
if err != nil {
Expand All @@ -507,6 +507,26 @@ func (storageDealPorcess *StorageDealProcessImpl) HandleOff(ctx context.Context,
err = fmt.Errorf("packing piece %s: %w", deal.Ref.PieceCid, packingErr)
return storageDealPorcess.HandleError(ctx, deal, err)
}
} else {
// An index can be created even if carFilePath is empty
carFilePath = ""
// carfile may already in piece storage, verify it
pieceStore, err := storageDealPorcess.pieceStorageMgr.FindStorageForRead(ctx, deal.Proposal.PieceCID.String())
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, err)
}
log.Debugf("found %s in piece storage", deal.Proposal.PieceCID)

l, err := pieceStore.Len(ctx, deal.Proposal.PieceCID.String())
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to got payload size: %v", err))
}

deal.PayloadSize = uint64(l)
err = storageDealPorcess.deals.SaveDeal(ctx, deal)
if err != nil {
return storageDealPorcess.HandleError(ctx, deal, fmt.Errorf("fail to save deal to database: %v", err))
}
}

// Register the deal data as a "shard" with the DAG store. Later it can be
Expand Down
159 changes: 99 additions & 60 deletions storageprovider/storage_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ type StorageProvider interface {
GetStorageCollateral(ctx context.Context, mAddr address.Address) (storagemarket.Balance, error)

// ImportDataForDeal manually imports data for an offline storage deal
ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error
ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error

// ImportPublishedDeal manually import published deals to storage deals
ImportPublishedDeal(ctx context.Context, deal types.MinerDeal) error
Expand Down Expand Up @@ -140,6 +140,7 @@ type StorageProviderImpl struct {
transferProcess IDatatransferHandler
storageReceiver smnet.StorageReceiver
minerMgr minermgr.IMinerMgr
pieceStorageMgr *piecestorage.PieceStorageManager
}

// NewStorageProvider returns a new storage provider
Expand Down Expand Up @@ -173,7 +174,8 @@ func NewStorageProvider(

dealStore: repo.StorageDealRepo(),

minerMgr: minerMgr,
minerMgr: minerMgr,
pieceStorageMgr: pieceStorageMgr,
}

dealProcess, err := NewStorageDealProcessImpl(mCtx, spV2.conns, newPeerTagger(spV2.net), spV2.spn, spV2.dealStore, spV2.storedAsk, tf, minerMgr, pieceStorageMgr, dataTransfer, dagStore, sdf, pb)
Expand Down Expand Up @@ -262,7 +264,9 @@ func (p *StorageProviderImpl) Stop() error {
// ImportDataForDeal manually imports data for an offline storage deal
// It will verify that the data in the passed io.Reader matches the expected piece
// cid for the given deal or it will error
func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader) error {
// If can find the car file from the piece store, read it directly without copying the car file to the local directory.
// If skipCommP is true, do not compare piece cid.
func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid.Cid, data io.Reader, skipCommP bool) error {
// TODO: be able to check if we have enough disk space
d, err := p.dealStore.GetDeal(ctx, propCid)
if err != nil {
Expand All @@ -277,81 +281,116 @@ func (p *StorageProviderImpl) ImportDataForDeal(ctx context.Context, propCid cid
return fmt.Errorf("deal %s does not support offline data", propCid)
}

fs, err := p.tf(d.Proposal.Provider)
if err != nil {
return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err)
}
var r io.Reader
var carSize int64
var piecePath filestore.Path
var cleanup = func() {}

tempfi, err := fs.CreateTemp()
if err != nil {
return fmt.Errorf("failed to create temp file for data import: %w", err)
}
defer func() {
if err := tempfi.Close(); err != nil {
log.Errorf("unable to close stream %v", err)
pieceStore, err := p.pieceStorageMgr.FindStorageForRead(ctx, d.Proposal.PieceCID.String())
if err == nil {
log.Debugf("found %v already in piece storage", d.Proposal.PieceCID)

// In order to avoid errors in the deal, the files in the piece store were deleted.
piecePath = filestore.Path("")
if carSize, err = pieceStore.Len(ctx, d.Proposal.PieceCID.String()); err != nil {
return fmt.Errorf("got piece size from piece store failed: %v", err)
}
}()
cleanup := func() {
_ = tempfi.Close()
_ = fs.Delete(tempfi.Path())
}
readerCloser, err := pieceStore.GetReaderCloser(ctx, d.Proposal.PieceCID.String())
if err != nil {
return fmt.Errorf("got reader from piece store failed: %v", err)
}
r = readerCloser

log.Debugw("will copy imported file to local file", "propCid", propCid)
n, err := io.Copy(tempfi, data)
if err != nil {
cleanup()
return fmt.Errorf("importing deal data failed: %w", err)
}
log.Debugw("finished copying imported file to local file", "propCid", propCid)
defer func() {
if err = readerCloser.Close(); err != nil {
log.Errorf("unable to close piece storage: %v, %v", d.Proposal.PieceCID, err)
}
}()
} else {
log.Debugf("not found %s in piece storage", d.Proposal.PieceCID)

_ = n // TODO: verify n?
fs, err := p.tf(d.Proposal.Provider)
if err != nil {
return fmt.Errorf("failed to create temp filestore for provider %s: %w", d.Proposal.Provider.String(), err)
}

carSize := uint64(tempfi.Size())
tempfi, err := fs.CreateTemp()
if err != nil {
return fmt.Errorf("failed to create temp file for data import: %w", err)
}
defer func() {
if err := tempfi.Close(); err != nil {
log.Errorf("unable to close stream %v", err)
}
}()
cleanup = func() {
_ = tempfi.Close()
_ = fs.Delete(tempfi.Path())
}

_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return fmt.Errorf("failed to seek through temp imported file: %w", err)
}
log.Debugw("will copy imported file to local file", "propCid", propCid)
n, err := io.Copy(tempfi, data)
if err != nil {
cleanup()
return fmt.Errorf("importing deal data failed: %w", err)
}
log.Debugw("finished copying imported file to local file", "propCid", propCid)

proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池?
if err != nil {
p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d)
cleanup()
return fmt.Errorf("failed to determine proof type: %w", err)
}
log.Debugw("fetched proof type", "propCid", propCid)
_ = n // TODO: verify n?

pieceCid, err := utils.GeneratePieceCommitment(proofType, tempfi, carSize)
if err != nil {
cleanup()
return fmt.Errorf("failed to generate commP: %w", err)
}
if carSizePadded := padreader.PaddedSize(carSize).Padded(); carSizePadded < d.Proposal.PieceSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pieceCid.Hash()[len(pieceCid.Hash())-32:],
uint64(carSizePadded),
uint64(d.Proposal.PieceSize),
)
carSize = tempfi.Size()
piecePath = tempfi.Path()
_, err = tempfi.Seek(0, io.SeekStart)
if err != nil {
cleanup()
return err
return fmt.Errorf("failed to seek through temp imported file: %w", err)
}
pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)

r = tempfi
}

// Verify CommP matches
if !pieceCid.Equals(d.Proposal.PieceCID) {
cleanup()
return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID)
if !skipCommP {
log.Debugf("will calculate piece cid")

proofType, err := p.spn.GetProofType(ctx, d.Proposal.Provider, nil) // TODO: 判断是不是属于此矿池?
if err != nil {
p.eventPublisher.Publish(storagemarket.ProviderEventNodeErrored, d)
cleanup()
return fmt.Errorf("failed to determine proof type: %w", err)
}
log.Debugw("fetched proof type", "propCid", propCid)

pieceCid, err := utils.GeneratePieceCommitment(proofType, r, uint64(carSize))
if err != nil {
cleanup()
return fmt.Errorf("failed to generate commP: %w", err)
}
if carSizePadded := padreader.PaddedSize(uint64(carSize)).Padded(); carSizePadded < d.Proposal.PieceSize {
// need to pad up!
rawPaddedCommp, err := commp.PadCommP(
// we know how long a pieceCid "hash" is, just blindly extract the trailing 32 bytes
pieceCid.Hash()[len(pieceCid.Hash())-32:],
uint64(carSizePadded),
uint64(d.Proposal.PieceSize),
)
if err != nil {
cleanup()
return err
}
pieceCid, _ = commcid.DataCommitmentV1ToCID(rawPaddedCommp)
}

// Verify CommP matches
if !pieceCid.Equals(d.Proposal.PieceCID) {
cleanup()
return fmt.Errorf("given data does not match expected commP (got: %s, expected %s)", pieceCid, d.Proposal.PieceCID)
}
}

log.Debugw("will fire ReserveProviderFunds for imported file", "propCid", propCid)

// "will fire VerifiedData for imported file
d.PiecePath = tempfi.Path()
d.PiecePath = piecePath
d.MetadataPath = filestore.Path("")
log.Infof("deal %s piece path: %s", propCid, d.PiecePath)

Expand Down