Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dont double-count queued / sent / received data when channel restarted #140

Merged
merged 2 commits into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 81 additions & 8 deletions channels/channels.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipld/go-ipld-prime"
peer "github.com/libp2p/go-libp2p-core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/filecoin-project/go-data-transfer/channels/internal"
"github.com/filecoin-project/go-data-transfer/channels/internal/migrations"
"github.com/filecoin-project/go-data-transfer/cidlists"
"github.com/filecoin-project/go-data-transfer/cidsets"
"github.com/filecoin-project/go-data-transfer/encoding"
)

Expand Down Expand Up @@ -53,6 +55,7 @@ type Channels struct {
stateMachines fsm.Group
migrateStateMachines func(context.Context) error
cidLists cidlists.CIDLists
seenCIDs *cidsets.CIDSetManager
}

// ChannelEnvironment -- just a proxy for DTNetwork for now
Expand All @@ -71,8 +74,11 @@ func New(ds datastore.Batching,
voucherResultDecoder DecoderByTypeFunc,
env ChannelEnvironment,
selfPeer peer.ID) (*Channels, error) {

seenCIDsDS := namespace.Wrap(ds, datastore.NewKey("seencids"))
c := &Channels{
cidLists: cidLists,
seenCIDs: cidsets.NewCIDSetManager(seenCIDsDS),
notifier: notifier,
voucherDecoder: voucherDecoder,
voucherResultDecoder: voucherResultDecoder,
Expand Down Expand Up @@ -117,6 +123,19 @@ func (c *Channels) dispatch(eventName fsm.EventName, channel fsm.StateType) {
}

c.notifier(evt, fromInternalChannelState(realChannel, c.voucherDecoder, c.voucherResultDecoder, c.cidLists.ReadList))

// When the channel has been cleaned up, remove the caches of seen cids
if evt.Code == datatransfer.CleanupComplete {
chid := datatransfer.ChannelID{
Initiator: realChannel.Initiator,
Responder: realChannel.Responder,
ID: realChannel.TransferID,
}
err := c.removeSeenCIDCaches(chid)
if err != nil {
log.Errorf("failed to clean up channel %s: %s", err)
}
}
}

// CreateNew creates a new channel id and channel state and saves to channels.
Expand Down Expand Up @@ -206,20 +225,21 @@ func (c *Channels) CompleteCleanupOnRestart(chid datatransfer.ChannelID) error {
return c.send(chid, datatransfer.CompleteCleanupOnRestart)
}

func (c *Channels) DataSent(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataSent, delta)
func (c *Channels) DataSent(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
return c.fireProgressEvent(chid, datatransfer.DataSent, datatransfer.DataSentProgress, k, delta)
}

func (c *Channels) DataQueued(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
return c.send(chid, datatransfer.DataQueued, delta)
func (c *Channels) DataQueued(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
return c.fireProgressEvent(chid, datatransfer.DataQueued, datatransfer.DataQueuedProgress, k, delta)
}

func (c *Channels) DataReceived(chid datatransfer.ChannelID, cid cid.Cid, delta uint64) error {
err := c.cidLists.AppendList(chid, cid)
func (c *Channels) DataReceived(chid datatransfer.ChannelID, k cid.Cid, delta uint64) error {
err := c.cidLists.AppendList(chid, k)
if err != nil {
return err
}
return c.send(chid, datatransfer.DataReceived, delta)

return c.fireProgressEvent(chid, datatransfer.DataReceived, datatransfer.DataReceivedProgress, k, delta)
}

// PauseInitiator pauses the initator of this channel
Expand Down Expand Up @@ -304,7 +324,60 @@ func (c *Channels) HasChannel(chid datatransfer.ChannelID) (bool, error) {
return c.stateMachines.Has(chid)
}

// removeSeenCIDCaches cleans up the caches of "seen" blocks, ie
// blocks that have already been queued / sent / received
func (c *Channels) removeSeenCIDCaches(chid datatransfer.ChannelID) error {
progressStates := []datatransfer.EventCode{
datatransfer.DataQueuedProgress,
datatransfer.DataSentProgress,
datatransfer.DataReceivedProgress,
}
for _, evt := range progressStates {
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
err := c.seenCIDs.DeleteSet(sid)
if err != nil {
return err
}
}
return nil
}

// onProgress fires an event indicating progress has been made in
// queuing / sending / receiving blocks.
// These events are fired only for new blocks (not for example if
// a block is resent)
func (c *Channels) fireProgressEvent(chid datatransfer.ChannelID, evt datatransfer.EventCode, progressEvt datatransfer.EventCode, k cid.Cid, delta uint64) error {
if err := c.checkChannelExists(chid, evt); err != nil {
return err
}

// Check if the block has already been seen
sid := cidsets.SetID(chid.String() + "/" + datatransfer.Events[evt])
seen, err := c.seenCIDs.InsertSetCID(sid, k)
if err != nil {
return err
}

// If the block has not been seen before, fire the progress event
if !seen {
if err := c.stateMachines.Send(chid, progressEvt, delta); err != nil {
return err
}
}

// Fire the regular event
return c.stateMachines.Send(chid, evt)
}

func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode, args ...interface{}) error {
err := c.checkChannelExists(chid, code)
if err != nil {
return err
}
return c.stateMachines.Send(chid, code, args...)
}

func (c *Channels) checkChannelExists(chid datatransfer.ChannelID, code datatransfer.EventCode) error {
has, err := c.stateMachines.Has(chid)
if err != nil {
return err
Expand All @@ -313,5 +386,5 @@ func (c *Channels) send(chid datatransfer.ChannelID, code datatransfer.EventCode
return xerrors.Errorf("cannot send FSM event %s to data-transfer channel %s: %w",
datatransfer.Events[code], chid, NewErrNotFound(chid))
}
return c.stateMachines.Send(chid, code, args...)
return nil
}
61 changes: 28 additions & 33 deletions channels/channels_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,16 @@ import (

var log = logging.Logger("data-transfer")

var transferringStates = []fsm.StateKey{
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing,
}

// ChannelEvents describe the events taht can
var ChannelEvents = fsm.Events{
fsm.Event(datatransfer.Open).FromAny().To(datatransfer.Requested),
Expand All @@ -23,40 +33,25 @@ var ChannelEvents = fsm.Events{

fsm.Event(datatransfer.Cancel).FromAny().To(datatransfer.Cancelling),

fsm.Event(datatransfer.DataReceived).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Received += delta
return nil
}),
fsm.Event(datatransfer.DataReceived).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataReceivedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Received += delta
return nil
}),

fsm.Event(datatransfer.DataSent).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Sent += delta
return nil
}),
fsm.Event(datatransfer.DataQueued).FromMany(
datatransfer.Requested,
datatransfer.Ongoing,
datatransfer.InitiatorPaused,
datatransfer.ResponderPaused,
datatransfer.BothPaused,
datatransfer.ResponderCompleted,
datatransfer.ResponderFinalizing).ToNoChange().Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Queued += delta
return nil
}),
fsm.Event(datatransfer.DataSent).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataSentProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Sent += delta
return nil
}),
fsm.Event(datatransfer.DataQueued).FromMany(transferringStates...).ToNoChange(),
fsm.Event(datatransfer.DataQueuedProgress).FromMany(transferringStates...).ToNoChange().
Action(func(chst *internal.ChannelState, delta uint64) error {
chst.Queued += delta
return nil
}),
fsm.Event(datatransfer.Disconnected).FromAny().ToNoChange().Action(func(chst *internal.ChannelState) error {
chst.Message = datatransfer.ErrDisconnected.Error()
return nil
Expand Down
9 changes: 6 additions & 3 deletions channels/channels_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ func TestChannels(t *testing.T) {

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(50), state.Received())
require.Equal(t, uint64(0), state.Sent())
require.Equal(t, []cid.Cid{cids[0]}, state.ReceivedCids())

err = channelList.DataSent(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 100)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataSentProgress)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(50), state.Received())
require.Equal(t, uint64(100), state.Sent())
Expand All @@ -167,6 +169,7 @@ func TestChannels(t *testing.T) {

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[1], 50)
require.NoError(t, err)
_ = checkEvent(ctx, t, received, datatransfer.DataReceivedProgress)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
Expand All @@ -176,14 +179,14 @@ func TestChannels(t *testing.T) {
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataSent)
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(125), state.Sent())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1]}, state.ReceivedCids())

err = channelList.DataReceived(datatransfer.ChannelID{Initiator: peers[0], Responder: peers[1], ID: tid1}, cids[0], 50)
require.NoError(t, err)
state = checkEvent(ctx, t, received, datatransfer.DataReceived)
require.Equal(t, uint64(150), state.Received())
require.Equal(t, uint64(125), state.Sent())
require.Equal(t, uint64(100), state.Received())
require.Equal(t, uint64(100), state.Sent())
require.Equal(t, []cid.Cid{cids[0], cids[1], cids[0]}, state.ReceivedCids())
})

Expand Down
111 changes: 111 additions & 0 deletions cidsets/cidsets.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package cidsets

import (
"sync"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-datastore/query"
)

// SetID is a unique ID for a CID set
type SetID string

// CIDSetManager keeps track of several CID sets, by SetID
type CIDSetManager struct {
ds datastore.Datastore
lk sync.Mutex
sets map[SetID]*cidSet
}

func NewCIDSetManager(ds datastore.Datastore) *CIDSetManager {
return &CIDSetManager{ds: ds, sets: make(map[SetID]*cidSet)}
}

// InsertSetCID inserts a CID into a CID set.
// Returns true if the set already contained the CID.
func (mgr *CIDSetManager) InsertSetCID(sid SetID, c cid.Cid) (exists bool, err error) {
return mgr.getSet(sid).Insert(c)
}

// DeleteSet deletes a CID set
func (mgr *CIDSetManager) DeleteSet(sid SetID) error {
return mgr.getSet(sid).Truncate()
}

// getSet gets the cidSet for the given SetID
func (mgr *CIDSetManager) getSet(sid SetID) *cidSet {
mgr.lk.Lock()
defer mgr.lk.Unlock()

s, ok := mgr.sets[sid]
if !ok {
s = NewCIDSet(mgr.getSetDS(sid))
mgr.sets[sid] = s
}
return s
}

// getSetDS gets the wrapped datastore for the given SetID
func (mgr *CIDSetManager) getSetDS(sid SetID) datastore.Batching {
setDSKey := datastore.NewKey(string(sid) + "/cids")
return namespace.Wrap(mgr.ds, setDSKey)
}

// cidSet persists a set of CIDs
type cidSet struct {
lk sync.Mutex
ds datastore.Batching
}

func NewCIDSet(ds datastore.Batching) *cidSet {
return &cidSet{ds: ds}
}

// Insert a CID into the set.
// Returns true if the the CID was already in the set.
func (s *cidSet) Insert(c cid.Cid) (exists bool, err error) {
s.lk.Lock()
defer s.lk.Unlock()

k := datastore.NewKey(c.String())
has, err := s.ds.Has(k)
if err != nil {
return false, err
}
if has {
return true, nil
}
return false, s.ds.Put(k, nil)
}

// Truncate removes all CIDs in the set
func (s *cidSet) Truncate() error {
s.lk.Lock()
defer s.lk.Unlock()

res, err := s.ds.Query(query.Query{KeysOnly: true})
if err != nil {
return err
}

entries, err := res.Rest()
if err != nil {
return err
}

batched, err := s.ds.Batch()
if err != nil {
return err
}

for _, entry := range entries {
err := batched.Delete(datastore.NewKey(entry.Key))
if err != nil {
return err
}
}

return batched.Commit()
}
Loading