Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

always try to read ahead by at least 5 blocks in the PBDagReader #5162

Merged
merged 5 commits into from
Jul 17, 2018
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions unixfs/io/dagreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,41 @@ func TestSeekAndReadLarge(t *testing.T) {
}
}

func TestReadAndCancel(t *testing.T) {
dserv := testu.GetDAGServ()
inbuf := make([]byte, 20000)
rand.Read(inbuf)

node := testu.GetNode(t, dserv, inbuf, testu.UseProtoBufLeaves)
ctx, closer := context.WithCancel(context.Background())
defer closer()

reader, err := NewDagReader(ctx, node, dserv)
if err != nil {
t.Fatal(err)
}

ctx, cancel := context.WithCancel(context.Background())
buf := make([]byte, 100)
_, err = reader.CtxReadFull(ctx, buf)
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(buf, inbuf[0:100]) {
t.Fatal("read failed")
}
cancel()

b, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal(err)
}

if !bytes.Equal(inbuf[100:], b) {
t.Fatal("buffers not equal")
}
}

func TestRelativeSeek(t *testing.T) {
dserv := testu.GetDAGServ()
ctx, closer := context.WithCancel(context.Background())
Expand Down
37 changes: 28 additions & 9 deletions unixfs/io/pbdagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,13 @@ func NewPBFileReader(ctx context.Context, n *mdag.ProtoNode, pb *ftpb.Data, serv

const preloadSize = 10

func (dr *PBDagReader) preloadNextNodes(ctx context.Context) {
beg := dr.linkPosition
func (dr *PBDagReader) preload(ctx context.Context, beg int) {
end := beg + preloadSize
if end >= len(dr.links) {
end = len(dr.links)
}

for i, p := range ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]) {
dr.promises[beg+i] = p
}
copy(dr.promises[beg:], ipld.GetNodes(ctx, dr.serv, dr.links[beg:end]))
}

// precalcNextBuf follows the next link in line and loads it from the
Expand All @@ -92,15 +89,37 @@ func (dr *PBDagReader) precalcNextBuf(ctx context.Context) error {
return io.EOF
}

if dr.promises[dr.linkPosition] == nil {
dr.preloadNextNodes(ctx)
// If we drop to <= preloadSize/2 preloading nodes, preload the next 10.
for i := dr.linkPosition; i < dr.linkPosition+preloadSize/2 && i < len(dr.promises); i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Apologies in advanced for the noise if I'm misunderstanding what this code does.)

So, if I'm understanding this correctly, right now the number of preloaded nodes available in advance goes like 10, 9, 8, ..., 0, all of a sudden the algorithm realizes it's out of nodes and has to load 10 more, causing the stuttering.

In that case, could we just move the for logic inside preload, calling it here every time without checking if promises[dr.linkPosition] is nil, so as to say: "I've consumed one node, hey preload(), make sure I still have preloadSize nodes available in front of me, you figure out what to do".

Maybe preload wouldn't even need the for, in the first call, dr.promises[0] == nil, load preloadSize nodes altogether, and after that, every time it's called (because a node has been consumed), preload the node dr.promises[dr.linkPosition + preloadSize] (assuming here that every node before dr.linkPosition + preloadSize is already loaded).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've just realized that all the time I was talking about nodes I should have been talking about node promises, I can't know if there's a node until I call Get() right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"I've consumed one node, hey preload(), make sure I still have preloadSize nodes available in front of me, you figure out what to do".

Sounds reasonable.

assuming here that every node before dr.linkPosition + preloadSize is already loaded

Technically, you can seek around so I'd rather be robust (the check is cheap).

I've just realized that all the time I was talking about nodes I should have been talking about node promises, I can't know if there's a node until I call Get() right?

No, but you can know if we've made a request for the node (i.e., the promise exists).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it looks like making that change makes the other call to preload trickier. That second call currently overwrites any existing promises assuming that if the first preload has been canceled, the later ones probably have been as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that fetching multiple blocks at once is more efficient.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that fetching multiple blocks at once is more efficient.

Oh, so maybe another constant N should be added that would indicate to call preload only if there are less than N available promises, to avoid calling it every time a node is consumed, e.g., if N is 5 then the number of available promises in advance would go like 10, 9, ..., 5, only now call preload, 15, 14, ..., 5, call preload again, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That second call currently overwrites any existing promises assuming that if the first preload has been canceled, the later ones probably have been as well.

Maybe preload() could be extended to also check if the context of dr.promises[i] has been cancelled, not only that it's nil. (Another argument could be added to preload() to indicate if we want to overwrite the promises, but the other solution sound more correct to me.)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, so maybe another constant N should be added that would indicate to call preload only if there are less than N available promises, to avoid calling it every time a node is consumed, e.g., if N is 5 then the number of available promises in advance would go like 10, 9, ..., 5, only now call preload, 15, 14, ..., 5, call preload again, etc.

That's what I currently do. If we have fewer than half preloadSize loaded, I load the next preloadSize. That means we vary between 5 and 15.

Maybe preload() could be extended to also check if the context of dr.promises[i] has been cancelled, not only that it's nil. (Another argument could be added to preload() to indicate if we want to overwrite the promises, but the other solution sound more correct to me.)

That's what the TODO is about. However, that requires a change to the promises. See: ipfs/go-ipld-format#34

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks for the clarification, then maybe we could add that extra argument to indicate that the current promises are cancelled and preload should request them again (overwriting the previous ones), if you think it's worth it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could but that doesn't seem any better to me. It just moves code from one place to another and then puts it behind a condition. Once we merge that linked PR, definitely.

// TODO: check if canceled.
if dr.promises[i] == nil {
dr.preload(ctx, i)
break
}
}

nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil {
dr.promises[dr.linkPosition] = nil
switch err {
case nil:
case context.DeadlineExceeded, context.Canceled:
err = ctx.Err()
if err != nil {
return ctx.Err()
}
// In this case, the context used to *preload* the node has been canceled.
// We need to retry the load with our context and we might as
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we elaborate a bit more on this? I had the impression we're always using the same Context from the PBDagReader structure. In which case the local ctx won't be cancelled but the one from the NodePromise will be? The context from the promise comes from this function, maybe in another call (at it may have been preloaded before), but at which point can the global context from the DAG reader change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're using the context from the call Read call. IMO, that's the correct context as it allows us to cancel reads and seek elsewhere (canceling any associated preloading as well).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, scratch that. If the user is using Read, we use the reader's cotnext. If the user is manually calling CtxReadFull (a context aware read), we use the supplied context. I don't really think any additional comments will help as the context is just whatever gets passed in (and listing every possible context is just going to lead to comment rot when that changes).

Copy link
Contributor

@schomatis schomatis Jul 5, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and listing every possible context is just going to lead to comment rot when that changes).

It seems to me there are only (or mainly) two possibilities. The most common (by far I think) is that the reader's context is used everywhere, so in fact there is only one context (and this part of the code isn't executed). The second, and less common,

If the user is manually calling CtxReadFull

which is when this scenario takes place (if I understand correctly).

Clarifying that distinction seems worth it, IMO.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough.

// well preload some extra nodes while we're at it.
dr.preload(ctx, dr.linkPosition)
nxt, err = dr.promises[dr.linkPosition].Get(ctx)
dr.promises[dr.linkPosition] = nil
if err != nil {
return err
}
default:
return err
}
dr.promises[dr.linkPosition] = nil
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always remove the promise. There's no reason to leave this around.


dr.linkPosition++

switch nxt := nxt.(type) {
Expand Down