Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move dag import and export into the local node CoreAPI implementation #8076

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions core/commands/dag/dag.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package dagcmd

import (
"fmt"
"github.com/ipfs/go-ipfs/core/coreapi"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: group with other go-ipfs imports

"io"

"github.com/ipfs/go-ipfs/core/commands/cmdenv"
Expand Down Expand Up @@ -55,13 +56,7 @@ type ResolveOutput struct {

// CarImportOutput is the output type of the 'dag import' commands
type CarImportOutput struct {
Root RootMeta
}

// RootMeta is the metadata for a root pinning response
type RootMeta struct {
Cid cid.Cid
PinErrorMsg string
Root coreapi.RootMeta
}

// DagPutCmd is a command for adding a dag node
Expand Down Expand Up @@ -156,11 +151,6 @@ var DagResolveCmd = &cmds.Command{
Type: ResolveOutput{},
}

type importResult struct {
roots map[cid.Cid]struct{}
err error
}

// DagImportCmd is a command for importing a car to ipfs
var DagImportCmd = &cmds.Command{
Helptext: cmds.HelpText{
Expand Down
61 changes: 14 additions & 47 deletions core/commands/dag/export.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package dagcmd

import (
"context"
"errors"
"fmt"
"io"
Expand All @@ -9,14 +10,15 @@ import (

"github.com/cheggaaa/pb"
cid "github.com/ipfs/go-cid"
cmds "github.com/ipfs/go-ipfs-cmds"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
ipld "github.com/ipfs/go-ipld-format"
mdag "github.com/ipfs/go-merkledag"

cmds "github.com/ipfs/go-ipfs-cmds"
gocar "github.com/ipld/go-car"
)

type dagExportAPI interface {
Export(ctx context.Context, c cid.Cid) (io.ReadCloser, <-chan error)
}

func dagExport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

c, err := cid.Decode(req.Arguments[0])
Expand All @@ -32,50 +34,15 @@ func dagExport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

// Code disabled until descent-issue in go-ipld-prime is fixed
// https://github.com/ribasushi/gip-muddle-up
//
// sb := gipselectorbuilder.NewSelectorSpecBuilder(gipfree.NodeBuilder())
// car := gocar.NewSelectiveCar(
// req.Context,
// <needs to be fixed to take format.NodeGetter as well>,
// []gocar.Dag{gocar.Dag{
// Root: c,
// Selector: sb.ExploreRecursive(
// gipselector.RecursionLimitNone(),
// sb.ExploreAll(sb.ExploreRecursiveEdge()),
// ).Node(),
// }},
// )
// ...
// if err := car.Write(pipeW); err != nil {}

pipeR, pipeW := io.Pipe()

errCh := make(chan error, 2) // we only report the 1st error
go func() {
defer func() {
if err := pipeW.Close(); err != nil {
errCh <- fmt.Errorf("stream flush failed: %s", err)
}
close(errCh)
}()

if err := gocar.WriteCar(
req.Context,
mdag.NewSession(
req.Context,
api.Dag(),
),
[]cid.Cid{c},
pipeW,
); err != nil {
errCh <- err
}
}()
dagExport, ok := api.Dag().(dagExportAPI)
if !ok {
return fmt.Errorf("API does not support DAG export")
}

rc, errCh := dagExport.Export(req.Context, c)

if err := res.Emit(pipeR); err != nil {
pipeR.Close() // ignore the error if any
if err := res.Emit(rc); err != nil {
rc.Close() // ignore the error if any
return err
}

Expand Down
189 changes: 30 additions & 159 deletions core/commands/dag/import.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@
package dagcmd

import (
"errors"
"context"
"fmt"
"io"

cid "github.com/ipfs/go-cid"
files "github.com/ipfs/go-ipfs-files"
"github.com/ipfs/go-ipfs/core/commands/cmdenv"
ipld "github.com/ipfs/go-ipld-format"
iface "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/interface-go-ipfs-core/options"

cmds "github.com/ipfs/go-ipfs-cmds"
gocar "github.com/ipld/go-car"
)

func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {

node, err := cmdenv.GetNode(env)
if err != nil {
return err
}
type dagImportAPI interface {
ImportMany(ctx context.Context, directory files.Directory, pinRoots bool) (<-chan coreapi.RootMeta, <-chan error)
}

func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment) error {
api, err := cmdenv.GetApi(env, req)
if err != nil {
return err
Expand All @@ -36,166 +29,44 @@ func dagImport(req *cmds.Request, res cmds.ResponseEmitter, env cmds.Environment
return err
}

// grab a pinlock ( which doubles as a GC lock ) so that regardless of the
// size of the streamed-in cars nothing will disappear on us before we had
// a chance to roots that may show up at the very end
// This is especially important for use cases like dagger:
// ipfs dag import $( ... | ipfs-dagger --stdout=carfifos )
//
unlocker := node.Blockstore.PinLock()
defer unlocker.Unlock()
dagapi, ok := api.Dag().(dagImportAPI)
if !ok {
return fmt.Errorf("API does not support DAG import")
}

doPinRoots, _ := req.Options[pinRootsOptionName].(bool)

retCh := make(chan importResult, 1)
go importWorker(req, res, api, retCh)
resCh, errCh := dagapi.ImportMany(req.Context, req.Files, doPinRoots)

done := <-retCh
if done.err != nil {
return done.err
err = <-errCh
if err != nil {
return err
}

// It is not guaranteed that a root in a header is actually present in the same ( or any )
// .car file. This is the case in version 1, and ideally in further versions too
// Accumulate any root CID seen in a header, and supplement its actual node if/when encountered
// We will attempt a pin *only* at the end in case all car files were well formed
//
// The boolean value indicates whether we have encountered the root within the car file's
roots := done.roots

// opportunistic pinning: try whatever sticks
if doPinRoots {

var failedPins int
for c := range roots {

// We need to re-retrieve a block, convert it to ipld, and feed it
// to the Pinning interface, sigh...
//
// If we didn't have the problem of inability to take multiple pinlocks,
// we could use the api directly like so (though internally it does the same):
//
// // not ideal, but the pinning api takes only paths :(
// rp := path.NewResolvedPath(
// ipfspath.FromCid(c),
// c,
// c,
// "",
// )
//
// if err := api.Pin().Add(req.Context, rp, options.Pin.Recursive(true)); err != nil {

ret := RootMeta{Cid: c}

if block, err := node.Blockstore.Get(c); err != nil {
ret.PinErrorMsg = err.Error()
} else if nd, err := ipld.Decode(block); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Pin(req.Context, nd, true); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := node.Pinning.Flush(req.Context); err != nil {
ret.PinErrorMsg = err.Error()
}

if ret.PinErrorMsg != "" {
failedPins++
}

if err := res.Emit(&CarImportOutput{Root: ret}); err != nil {
return err
}
}

if failedPins > 0 {
return fmt.Errorf(
"unable to pin all roots: %d out of %d failed",
failedPins,
len(roots),
)
}
if !doPinRoots {
return nil
}

return nil
}

func importWorker(req *cmds.Request, re cmds.ResponseEmitter, api iface.CoreAPI, ret chan importResult) {
var failedPins, totalRoots int

// this is *not* a transaction
// it is simply a way to relieve pressure on the blockstore
// similar to pinner.Pin/pinner.Flush
batch := ipld.NewBatch(req.Context, api.Dag())

roots := make(map[cid.Cid]struct{})

it := req.Files.Entries()
for it.Next() {

file := files.FileFromEntry(it)
if file == nil {
ret <- importResult{err: errors.New("expected a file handle")}
return
for ret := range resCh {
if ret.PinErrorMsg != "" {
failedPins++
}
totalRoots++

// wrap a defer-closer-scope
//
// every single file in it() is already open before we start
// just close here sooner rather than later for neatness
// and to surface potential errors writing on closed fifos
// this won't/can't help with not running out of handles
err := func() error {
defer file.Close()

car, err := gocar.NewCarReader(file)
if err != nil {
return err
}

// Be explicit here, until the spec is finished
if car.Header.Version != 1 {
return errors.New("only car files version 1 supported at present")
}

for _, c := range car.Header.Roots {
roots[c] = struct{}{}
}

for {
block, err := car.Next()
if err != nil && err != io.EOF {
return err
} else if block == nil {
break
}

// the double-decode is suboptimal, but we need it for batching
nd, err := ipld.Decode(block)
if err != nil {
return err
}

if err := batch.Add(req.Context, nd); err != nil {
return err
}
}

return nil
}()

if err != nil {
ret <- importResult{err: err}
return
if err := res.Emit(&CarImportOutput{Root: ret}); err != nil {
return err
}
}

if err := it.Err(); err != nil {
ret <- importResult{err: err}
return
}

if err := batch.Commit(); err != nil {
ret <- importResult{err: err}
return
if doPinRoots && failedPins > 0 {
return fmt.Errorf(
"unable to pin all roots: %d out of %d failed",
failedPins,
totalRoots,
)
}

ret <- importResult{roots: roots}
return nil
}
Loading