From 11e52e4a7395f3eb091417c8220a33c9da315dda Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 4 May 2017 18:00:15 -0700 Subject: [PATCH] WIP: wire sessions up through into FetchGraph License: MIT Signed-off-by: Jeromy --- blockservice/blockservice.go | 57 ++++++++++++++++++++++++++++++++---- exchange/bitswap/session.go | 5 ++++ exchange/interface.go | 16 +++++++--- merkledag/merkledag.go | 25 ++++++++++++++-- 4 files changed, 91 insertions(+), 12 deletions(-) diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index 294b541338e..5b66ee78d0b 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -11,6 +11,7 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" "github.com/ipfs/go-ipfs/blocks/blockstore" exchange "github.com/ipfs/go-ipfs/exchange" + bitswap "github.com/ipfs/go-ipfs/exchange/bitswap" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" cid "gx/ipfs/QmYhQaCYEcaPPjxJX7YcPcVKkQfRy6sJ7B3XmGFk82XYdQ/go-cid" @@ -31,6 +32,7 @@ type BlockService interface { GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block DeleteBlock(o blocks.Block) error + NewSession(context.Context) *Session Close() error } @@ -77,6 +79,21 @@ func (bs *blockService) Exchange() exchange.Interface { return bs.exchange } +func (bs *blockService) NewSession(ctx context.Context) *Session { + bswap, ok := bs.Exchange().(*bitswap.Bitswap) + if ok { + ses := bswap.NewSession(ctx) + return &Session{ + ses: ses, + bs: bs.blockstore, + } + } + return &Session{ + ses: bs.exchange, + bs: bs.blockstore, + } +} + // AddBlock adds a particular block to the service, Putting it into the datastore. // TODO pass a context into this if the remote.HasBlock is going to remain here. func (s *blockService) AddBlock(o blocks.Block) (*cid.Cid, error) { @@ -141,16 +158,25 @@ func (s *blockService) AddBlocks(bs []blocks.Block) ([]*cid.Cid, error) { func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { log.Debugf("BlockService GetBlock: '%s'", c) - block, err := s.blockstore.Get(c) + var f exchange.Fetcher + if s.exchange != nil { + f = s.exchange + } + + return getBlock(ctx, c, s.blockstore, f) +} + +func getBlock(ctx context.Context, c *cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) (blocks.Block, error) { + block, err := bs.Get(c) if err == nil { return block, nil } - if err == blockstore.ErrNotFound && s.exchange != nil { + if err == blockstore.ErrNotFound && f != nil { // TODO be careful checking ErrNotFound. If the underlying // implementation changes, this will break. log.Debug("Blockservice: Searching bitswap") - blk, err := s.exchange.GetBlock(ctx, c) + blk, err := f.GetBlock(ctx, c) if err != nil { if err == blockstore.ErrNotFound { return nil, ErrNotFound @@ -172,12 +198,16 @@ func (s *blockService) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, // the returned channel. // NB: No guarantees are made about order. func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { + return getBlocks(ctx, ks, s.blockstore, s.exchange) +} + +func getBlocks(ctx context.Context, ks []*cid.Cid, bs blockstore.Blockstore, f exchange.Fetcher) <-chan blocks.Block { out := make(chan blocks.Block, 0) go func() { defer close(out) var misses []*cid.Cid for _, c := range ks { - hit, err := s.blockstore.Get(c) + hit, err := bs.Get(c) if err != nil { misses = append(misses, c) continue @@ -194,7 +224,7 @@ func (s *blockService) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan bloc return } - rblocks, err := s.exchange.GetBlocks(ctx, misses) + rblocks, err := f.GetBlocks(ctx, misses) if err != nil { log.Debugf("Error with GetBlocks: %s", err) return @@ -220,3 +250,20 @@ func (s *blockService) Close() error { log.Debug("blockservice is shutting down...") return s.exchange.Close() } + +type Session struct { + bs blockstore.Blockstore + ses exchange.Session +} + +func (s *Session) GetBlock(ctx context.Context, c *cid.Cid) (blocks.Block, error) { + return getBlock(ctx, c, s.bs, s.ses) +} + +func (s *Session) GetBlocks(ctx context.Context, ks []*cid.Cid) <-chan blocks.Block { + return getBlocks(ctx, ks, s.bs, s.ses) +} + +func (s *Session) Close() error { + return s.ses.Close() +} diff --git a/exchange/bitswap/session.go b/exchange/bitswap/session.go index 4adcd344b0b..2c8d72b0e65 100644 --- a/exchange/bitswap/session.go +++ b/exchange/bitswap/session.go @@ -219,6 +219,11 @@ func (s *Session) fetch(ctx context.Context, keys []*cid.Cid) { } } +func (s *Session) Close() error { + // TODO: + return nil +} + // GetBlocks fetches a set of blocks within the context of this session and // returns a channel that found blocks will be returned on. No order is // guaranteed on the returned blocks. diff --git a/exchange/interface.go b/exchange/interface.go index 58c4c14aec6..ce555322a45 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -13,10 +13,7 @@ import ( // Any type that implements exchange.Interface may be used as an IPFS block // exchange protocol. type Interface interface { // type Exchanger interface - // GetBlock returns the block associated with a given key. - GetBlock(context.Context, *cid.Cid) (blocks.Block, error) - - GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) + Fetcher // TODO Should callers be concerned with whether the block was made // available on the network? @@ -26,3 +23,14 @@ type Interface interface { // type Exchanger interface io.Closer } + +type Fetcher interface { + // GetBlock returns the block associated with a given key. + GetBlock(context.Context, *cid.Cid) (blocks.Block, error) + GetBlocks(context.Context, []*cid.Cid) (<-chan blocks.Block, error) +} + +type Session interface { + Fetcher + Close() error +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index e0dc5c74ae1..bde92d22184 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -153,7 +153,7 @@ func (n *dagService) Remove(nd node.Node) error { // GetLinksDirect creates a function to get the links for a node, from // the node, bypassing the LinkService. If the node does not exist // locally (and can not be retrieved) an error will be returned. -func GetLinksDirect(serv DAGService) GetLinks { +func GetLinksDirect(serv node.NodeGetter) GetLinks { return func(ctx context.Context, c *cid.Cid) ([]*node.Link, error) { node, err := serv.Get(ctx, c) if err != nil { @@ -163,11 +163,30 @@ func GetLinksDirect(serv DAGService) GetLinks { } } +type sesGetter struct { + bs *bserv.Session +} + +func (sg *sesGetter) Get(ctx context.Context, c *cid.Cid) (node.Node, error) { + blk, err := sg.bs.GetBlock(ctx, c) + if err != nil { + return nil, err + } + + return decodeBlock(blk) +} + // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { + var ng node.NodeGetter = serv + ds, ok := serv.(*dagService) + if ok { + ng = &sesGetter{ds.Blocks.NewSession(ctx)} + } + v, _ := ctx.Value("progress").(*ProgressTracker) if v == nil { - return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, cid.NewSet().Visit) + return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit) } set := cid.NewSet() visit := func(c *cid.Cid) bool { @@ -178,7 +197,7 @@ func FetchGraph(ctx context.Context, root *cid.Cid, serv DAGService) error { return false } } - return EnumerateChildrenAsync(ctx, GetLinksDirect(serv), root, visit) + return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit) } // FindLinks searches this nodes links for the given key,