Skip to content

Commit

Permalink
feat: log request / response events
Browse files Browse the repository at this point in the history
  • Loading branch information
dirkmc committed Jan 19, 2021
1 parent 1d268de commit f4d3372
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 21 deletions.
22 changes: 18 additions & 4 deletions impl/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
)

func (m *manager) OnChannelOpened(chid datatransfer.ChannelID) error {
log.Infof("channel %s: opened", chid)

has, err := m.channels.HasChannel(chid)
if err != nil {
return err
Expand Down Expand Up @@ -121,6 +123,8 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra
return m.receiveNewRequest(chid.Initiator, request)
}
if request.IsCancel() {
log.Infof("channel %s: received cancel request, cleaning up channel", chid)

m.transport.CleanupChannel(chid)
return nil, m.channels.Cancel(chid)
}
Expand All @@ -147,6 +151,7 @@ func (m *manager) OnRequestReceived(chid datatransfer.ChannelID, request datatra

func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datatransfer.Response) error {
if response.IsCancel() {
log.Infof("channel %s: received cancel response, cancelling channel", chid)
return m.channels.Cancel(chid)
}
if response.IsVoucherResult() {
Expand All @@ -161,16 +166,19 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
}
if !response.Accepted() {
log.Infof("channel %s: received rejected response, erroring out channel", chid)
return m.channels.Error(chid, datatransfer.ErrRejected)
}
if response.IsNew() {
log.Infof("channel %s: received new response, accepting channel", chid)
err := m.channels.Accept(chid)
if err != nil {
return err
}
}

if response.IsRestart() {
log.Infof("channel %s: received restart response, restarting channel", chid)
err := m.channels.Restart(chid)
if err != nil {
return err
Expand All @@ -179,6 +187,7 @@ func (m *manager) OnResponseReceived(chid datatransfer.ChannelID, response datat
}
if response.IsComplete() && response.Accepted() {
if !response.IsPaused() {
log.Infof("channel %s: received complete response, completing channel", chid)
return m.channels.ResponderCompletes(chid)
}
err := m.channels.ResponderBeginsFinalization(chid)
Expand Down Expand Up @@ -285,8 +294,9 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
return nil
}
if msg != nil {
if err := m.dataTransferNetwork.SendMessage(context.TODO(), chid.Initiator, msg); err != nil {
log.Warnf("failed to send completion message, err : %v", err)
log.Infof("channel %s: sending completion message", chid)
if err := m.dataTransferNetwork.SendMessage(context.Background(), chid.Initiator, msg); err != nil {
log.Warnf("channel %s: failed to send completion message: %s", chid, err)
return m.OnRequestDisconnected(context.TODO(), chid)
}
}
Expand All @@ -312,6 +322,8 @@ func (m *manager) OnChannelCompleted(chid datatransfer.ChannelID, success bool)
}

func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming datatransfer.Request) (datatransfer.Response, error) {
log.Infof("channel %s: received restart request", chid)

result, err := m.restartRequest(chid, incoming)
msg, msgErr := m.response(true, false, err, incoming.TransferID(), result)
if msgErr != nil {
Expand All @@ -323,6 +335,8 @@ func (m *manager) receiveRestartRequest(chid datatransfer.ChannelID, incoming da
func (m *manager) receiveNewRequest(
initiator peer.ID,
incoming datatransfer.Request) (datatransfer.Response, error) {
log.Infof("received new channel request from %s", initiator)

result, err := m.acceptRequest(initiator, incoming)
msg, msgErr := m.response(false, true, err, incoming.TransferID(), result)
if msgErr != nil {
Expand All @@ -340,7 +354,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID,
}

if err := m.validateRestartRequest(context.Background(), initiator, chid, incoming); err != nil {
return nil, err
return nil, xerrors.Errorf("restart request for channel %s failed validation: %w", chid, err)
}

stor, err := incoming.Selector()
Expand All @@ -361,7 +375,7 @@ func (m *manager) restartRequest(chid datatransfer.ChannelID,
}
}
if err := m.channels.Restart(chid); err != nil {
return result, err
return result, xerrors.Errorf("failed to restart channel %s: %w", chid, err)
}
processor, has := m.transportConfigurers.Processor(voucher.Type())
if has {
Expand Down
18 changes: 17 additions & 1 deletion impl/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ func (m *manager) notifier(evt datatransfer.Event, chst datatransfer.ChannelStat

// Start initializes data transfer processing
func (m *manager) Start(ctx context.Context) error {
log.Info("start data-transfer module")

go func() {
err := m.channels.Start(ctx)
if err != nil {
Expand All @@ -189,6 +191,7 @@ func (m *manager) Start(ctx context.Context) error {
log.Warnf("Publish data transfer ready event: %s", err.Error())
}
}()

dtReceiver := &receiver{m}
m.dataTransferNetwork.SetDelegate(dtReceiver)
return m.transport.SetEventHandler(m)
Expand All @@ -201,6 +204,7 @@ func (m *manager) OnReady(ready datatransfer.ReadyFunc) {

// Stop terminates all data transfers and ends processing
func (m *manager) Stop(ctx context.Context) error {
log.Info("stop data-transfer module")
m.pushChannelMonitor.Shutdown()
return m.transport.Shutdown(ctx)
}
Expand All @@ -221,6 +225,8 @@ func (m *manager) RegisterVoucherType(voucherType datatransfer.Voucher, validato
// OpenPushDataChannel opens a data transfer that will send data to the recipient peer and
// transfer parts of the piece that match the selector
func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, error) {
log.Infof("open push channel to %s with base cid %s", requestTo, baseCid)

req, err := m.newRequest(ctx, selector, false, voucher, baseCid, requestTo)
if err != nil {
return datatransfer.ChannelID{}, err
Expand Down Expand Up @@ -257,6 +263,8 @@ func (m *manager) OpenPushDataChannel(ctx context.Context, requestTo peer.ID, vo
// OpenPullDataChannel opens a data transfer that will request data from the sending peer and
// transfer parts of the piece that match the selector
func (m *manager) OpenPullDataChannel(ctx context.Context, requestTo peer.ID, voucher datatransfer.Voucher, baseCid cid.Cid, selector ipld.Node) (datatransfer.ChannelID, error) {
log.Infof("open pull channel to %s with base cid %s", requestTo, baseCid)

req, err := m.newRequest(ctx, selector, true, voucher, baseCid, requestTo)
if err != nil {
return datatransfer.ChannelID{}, err
Expand Down Expand Up @@ -304,15 +312,18 @@ func (m *manager) SendVoucher(ctx context.Context, channelID datatransfer.Channe

// close an open channel (effectively a cancel)
func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("close channel %s", chid)

chst, err := m.channels.GetByID(ctx, chid)
if err != nil {
return err
}
err = m.transport.CloseChannel(ctx, chid)
if err != nil {
log.Warnf("unable to close channel: %w", err)
log.Warnf("unable to close channel %s: %s", chid, err)
}

log.Infof("%s: sending close channel to %s for channel %s", m.peerID, chst.OtherPeer(), chid)
err = m.dataTransferNetwork.SendMessage(ctx, chst.OtherPeer(), m.cancelMessage(chid))
if err != nil {
err = fmt.Errorf("Unable to send cancel message: %w", err)
Expand All @@ -333,6 +344,7 @@ func (m *manager) CloseDataTransferChannel(ctx context.Context, chid datatransfe

// pause a running data transfer channel
func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("pause channel %s", chid)

pausable, ok := m.transport.(datatransfer.PauseableTransport)
if !ok {
Expand All @@ -355,6 +367,8 @@ func (m *manager) PauseDataTransferChannel(ctx context.Context, chid datatransfe

// resume a running data transfer channel
func (m *manager) ResumeDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("resume channel %s", chid)

pausable, ok := m.transport.(datatransfer.PauseableTransport)
if !ok {
return datatransfer.ErrUnsupported
Expand Down Expand Up @@ -427,6 +441,8 @@ func (m *manager) RegisterTransportConfigurer(voucherType datatransfer.Voucher,

// RestartDataTransferChannel restarts data transfer on the channel with the given channelId
func (m *manager) RestartDataTransferChannel(ctx context.Context, chid datatransfer.ChannelID) error {
log.Infof("restart channel %s", chid)

channel, err := m.channels.GetByID(ctx, chid)
if err != nil {
return xerrors.Errorf("failed to fetch channel: %w", err)
Expand Down
19 changes: 12 additions & 7 deletions impl/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func (r *receiver) ReceiveRequest(
incoming datatransfer.Request) {
err := r.receiveRequest(ctx, initiator, incoming)
if err != nil {
log.Warn(err)
log.Warnf("error processing request from %s: %s", initiator, err)
}
}

Expand Down Expand Up @@ -97,6 +97,9 @@ func (r *receiver) receiveResponse(
return r.manager.transport.(datatransfer.PauseableTransport).PauseChannel(ctx, chid)
}
if err != nil {
log.Warnf("closing channel %s after getting error processing response from %s: %s",
chid, sender, err)

_ = r.manager.transport.CloseChannel(ctx, chid)
return err
}
Expand All @@ -113,10 +116,12 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,

ch, err := incoming.RestartChannelId()
if err != nil {
log.Errorf("failed to fetch restart channel Id: %w", err)
log.Errorf("cannot restart channel: failed to fetch channel Id: %w", err)
return
}

log.Infof("channel %s: received restart existing channel request from %s", ch, sender)

// validate channel exists -> in non-terminal state and that the sender matches
channel, err := r.manager.channels.GetByID(ctx, ch)
if err != nil || channel == nil {
Expand All @@ -126,30 +131,30 @@ func (r *receiver) ReceiveRestartExistingChannelRequest(ctx context.Context,

// initiator should be me
if channel.ChannelID().Initiator != r.manager.peerID {
log.Error("channel initiator is not the manager peer")
log.Error("cannot restart channel %s: channel initiator is not the manager peer", ch)
return
}

// other peer should be the counter party on the channel
if channel.OtherPeer() != sender {
log.Error("channel counterparty is not the sender peer")
log.Error("cannot restart channel %s: channel counterparty is not the sender peer", ch)
return
}

// channel should NOT be terminated
if channels.IsChannelTerminated(channel.Status()) {
log.Error("channel is already terminated")
log.Error("cannot restart channel %s: channel already terminated", ch)
return
}

switch r.manager.channelDataTransferType(channel) {
case ManagerPeerCreatePush:
if err := r.manager.openPushRestartChannel(ctx, channel); err != nil {
log.Errorf("failed to open push restart channel: %w", err)
log.Errorf("failed to open push restart channel %s: %s", ch, err)
}
case ManagerPeerCreatePull:
if err := r.manager.openPullRestartChannel(ctx, channel); err != nil {
log.Errorf("failed to open pull restart channel: %w", err)
log.Errorf("failed to open pull restart channel %s: %s", ch, err)
}
default:
log.Error("peer is not the creator of the channel")
Expand Down
22 changes: 13 additions & 9 deletions impl/responding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package impl_test

import (
"context"
"fmt"
"math/rand"
"os"
"testing"
Expand Down Expand Up @@ -782,7 +783,8 @@ func TestDataTransferRestartResponding(t *testing.T) {
},
verify: func(t *testing.T, h *receiverHarness) {
// receive an incoming pull
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
chid := channelID(h.id, h.peers)
_, err := h.transport.EventHandler.OnRequestReceived(chid, h.pullRequest)
require.NoError(t, err)
require.Len(t, h.sv.ValidationsReceived, 1)
require.Len(t, h.transport.OpenedChannels, 0)
Expand All @@ -792,8 +794,8 @@ func TestDataTransferRestartResponding(t *testing.T) {
randCid := testutil.GenerateCids(1)[0]
restartReq, err := message.NewRequest(h.id, true, true, h.voucher.Type(), h.voucher, randCid, h.stor)
require.NoError(t, err)
_, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), restartReq)
require.EqualError(t, err, "base cid does not match")
_, err = h.transport.EventHandler.OnRequestReceived(chid, restartReq)
require.EqualError(t, err, fmt.Sprintf("restart request for channel %s failed validation: base cid does not match", chid))
},
},
"restart request fails if voucher type is not decodable": {
Expand All @@ -804,7 +806,8 @@ func TestDataTransferRestartResponding(t *testing.T) {
},
verify: func(t *testing.T, h *receiverHarness) {
// receive an incoming pull
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
chid := channelID(h.id, h.peers)
_, err := h.transport.EventHandler.OnRequestReceived(chid, h.pullRequest)
require.NoError(t, err)
require.Len(t, h.sv.ValidationsReceived, 1)
require.Len(t, h.transport.OpenedChannels, 0)
Expand All @@ -814,8 +817,8 @@ func TestDataTransferRestartResponding(t *testing.T) {

restartReq, err := message.NewRequest(h.id, true, true, "rand", h.voucher, h.baseCid, h.stor)
require.NoError(t, err)
_, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), restartReq)
require.EqualError(t, err, "failed to decode request voucher: unknown voucher type: rand")
_, err = h.transport.EventHandler.OnRequestReceived(chid, restartReq)
require.EqualError(t, err, fmt.Sprintf("restart request for channel %s failed validation: failed to decode request voucher: unknown voucher type: rand", chid))
},
},
"restart request fails if voucher does not match": {
Expand All @@ -826,7 +829,8 @@ func TestDataTransferRestartResponding(t *testing.T) {
},
verify: func(t *testing.T, h *receiverHarness) {
// receive an incoming pull
_, err := h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), h.pullRequest)
chid := channelID(h.id, h.peers)
_, err := h.transport.EventHandler.OnRequestReceived(chid, h.pullRequest)
require.NoError(t, err)
require.Len(t, h.sv.ValidationsReceived, 1)
require.Len(t, h.transport.OpenedChannels, 0)
Expand All @@ -837,8 +841,8 @@ func TestDataTransferRestartResponding(t *testing.T) {
v.Data = "rand"
restartReq, err := message.NewRequest(h.id, true, true, h.voucher.Type(), v, h.baseCid, h.stor)
require.NoError(t, err)
_, err = h.transport.EventHandler.OnRequestReceived(channelID(h.id, h.peers), restartReq)
require.EqualError(t, err, "channel and request vouchers do not match")
_, err = h.transport.EventHandler.OnRequestReceived(chid, restartReq)
require.EqualError(t, err, fmt.Sprintf("restart request for channel %s failed validation: channel and request vouchers do not match", chid))
},
},
"ReceiveRestartExistingChannelRequest: Reopen Pull Channel": {
Expand Down
4 changes: 4 additions & 0 deletions impl/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ func (m *manager) openPushRestartChannel(ctx context.Context, channel datatransf
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())

log.Infof("sending push restart channel to %s for channel %s", requestTo, chid)
if err := m.dataTransferNetwork.SendMessage(ctx, requestTo, req); err != nil {
return xerrors.Errorf("Unable to send restart request: %w", err)
}
Expand All @@ -123,6 +125,8 @@ func (m *manager) openPullRestartChannel(ctx context.Context, channel datatransf
transportConfigurer(chid, voucher, m.transport)
}
m.dataTransferNetwork.Protect(requestTo, chid.String())

log.Infof("sending open channel to %s to restart channel %s", requestTo, chid)
if err := m.transport.OpenChannel(ctx, requestTo, chid, cidlink.Link{Cid: baseCid}, selector, channel.ReceivedCids(), req); err != nil {
return xerrors.Errorf("Unable to send open channel restart request: %w", err)
}
Expand Down

0 comments on commit f4d3372

Please sign in to comment.