Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move dag import and export into the local node CoreAPI implementation #8076

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

aschmahmann
Copy link
Contributor

This is an attempt to make importing/exporting CAR files into go-ipfs generally more usable before we extend the CoreAPI to include it.

The immediate use is to just unlock making it possible to do CAR import/export without going through the commands lib and either the CLI or HTTP API.

@@ -2,6 +2,7 @@ package dagcmd

import (
"fmt"
"github.com/ipfs/go-ipfs/core/coreapi"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: group with other go-ipfs imports

// occurred when importing each root will be returned as well.
//
// This function's API is not yet stable
func (api *dagAPI) ImportMany(ctx context.Context, directory files.Directory, doPinRoots bool) (<-chan RootMeta, <-chan error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really not a fan of this function signature, but it does emulate what we had before. Up for suggestions, or maybe we just merge this PR since it's better than what we've got and work on improving this later. Marked this as unstable to mark my dissatisfaction.

The use of files.Directory seems pretty suspect here too, should it be some typed iterator/slice/channel of ReaderClosers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler Import function could also be added here separate from what's used by the commands lib.


// Export exports the DAG rooted at a given CID into a CAR file. The error channel will return at most one error and
// the reader is usable before the error channel has been closed.
func (api *dagAPI) Export(ctx context.Context, c cid.Cid) (io.ReadCloser, <-chan error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm much happier with the Export API, although returning error channels still doesn't feel great here. Up for suggestions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Instead of returning an error channel, what about calling CloseWithError on pipeW. That way the caller will get the error when trying to read from the io.ReadCloser returned here, if an error happens before reading is finished.
Example: https://play.golang.org/p/5caJVZb869-

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, that looks useful! @ribasushi do you recall any reason we didn't do this before (e.g. it didn't play well with the commands lib emitter)?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I simply didn't know about CloseWithError at the time: 👍

Comment on lines +294 to +296
if closeErr := pipeW.Close(); closeErr != nil && err == nil {
errCh <- fmt.Errorf("stream flush failed: %s", closeErr)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kept the code mostly the same, except that we now only return a single error instead of returning two errors assuming that the caller will only process one of them which is what we did before. This required moving this pipe closure + error checking out of the defer statement.

We could also just wait on both functions to complete and return a multierror.

ret.PinErrorMsg = err.Error()
} else if err := api.core.pinning.Pin(ctx, nd, true); err != nil {
ret.PinErrorMsg = err.Error()
} else if err := api.core.pinning.Flush(ctx); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be better to do once after as a defered function call.


// Export exports the DAG rooted at a given CID into a CAR file. The error channel will return at most one error and
// the reader is usable before the error channel has been closed.
func (api *dagAPI) Export(ctx context.Context, c cid.Cid) (io.ReadCloser, <-chan error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggestion: Instead of returning an error channel, what about calling CloseWithError on pipeW. That way the caller will get the error when trying to read from the io.ReadCloser returned here, if an error happens before reading is finished.
Example: https://play.golang.org/p/5caJVZb869-

roots := make(map[cid.Cid]struct{})

it := dir.Entries()
for it.Next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would this benefit from starting up a number of goroutines (one per entry) and doing the imports concurrently?

Copy link
Contributor Author

@aschmahmann aschmahmann Apr 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Almost certainly (up until some limit of worker nodes).

Also, this code does not properly respect contexts. If for some reason the batching datastore underneath does not return an error on batch.Add(ctx, node) we won't abort during this process.

Round one here was trying to make sure I copy-pasted the code correctly since some edits were made to fit the slightly modified functions required. Need some eyes to make sure I've done that part right.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One could certainly do things here in parallel, but be careful not switching to parallel full-on imports, as this will break a use case hat goes something like:
ipfs dag import {{named-fifo-pipe-containing-blocks-and-NO-ROOTS-to-pin}} {{named-fifo-pipe-containing-NO-BLOCKS-and-a-list-of-roots-to-pin-based-on-previous-fifo}}

Copy link
Contributor

@ribasushi ribasushi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The actual code-migration LGTM, perhaps further improvements ( paralellisation ) could land in subsequent PRs

👍 either way

@aschmahmann
Copy link
Contributor Author

I think I introduced a bug somewhere here since there's a test that's hanging. Need to poke around to find out what I messed up, before merging 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants