Skip to content

Commit

Permalink
disallow SetReadDeadline after read half is closed
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Feb 20, 2024
1 parent 2924291 commit 7d341f3
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 16 deletions.
29 changes: 14 additions & 15 deletions p2p/transport/webrtc/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,17 @@ func (s *stream) Close() error {
s.Reset()
return errors.Join(closeWriteErr, closeReadErr)
}

s.mx.Lock()
s.controlMessageReaderEndTime = time.Now().Add(maxFINACKWait)
if s.controlMessageReaderEndTime.IsZero() {
s.controlMessageReaderEndTime = time.Now().Add(maxFINACKWait)
s.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour))
go func() {
s.controlMessageReaderDone.Wait()
s.cleanup()
}()
}
s.mx.Unlock()
s.SetReadDeadline(time.Now().Add(-1 * time.Hour))

go func() {
s.controlMessageReaderDone.Wait()
s.cleanup()
}()
return nil
}

Expand All @@ -165,10 +167,8 @@ func (s *stream) Reset() error {
defer s.cleanup()
cancelWriteErr := s.cancelWrite()
closeReadErr := s.CloseRead()
if cancelWriteErr != nil {
return cancelWriteErr
}
return closeReadErr
s.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour))
return errors.Join(closeReadErr, cancelWriteErr)
}

func (s *stream) closeForShutdown(closeErr error) {
Expand All @@ -178,7 +178,6 @@ func (s *stream) closeForShutdown(closeErr error) {
defer s.mx.Unlock()

s.closeForShutdownErr = closeErr
s.SetReadDeadline(time.Now().Add(-1 * time.Hour))
s.notifyWriteStateChanged()
}

Expand Down Expand Up @@ -230,18 +229,18 @@ func (s *stream) spawnControlMessageReader() {
go func() {
defer s.controlMessageReaderDone.Done()
// cleanup the sctp deadline timer goroutine
defer s.SetReadDeadline(time.Time{})
defer s.setDataChannelReadDeadline(time.Time{})

setDeadline := func() bool {
if s.controlMessageReaderEndTime.IsZero() || time.Now().Before(s.controlMessageReaderEndTime) {
s.SetReadDeadline(s.controlMessageReaderEndTime)
s.setDataChannelReadDeadline(s.controlMessageReaderEndTime)
return true
}
return false
}

// Unblock any Read call waiting on reader.ReadMsg
s.SetReadDeadline(time.Now().Add(-1 * time.Hour))
s.setDataChannelReadDeadline(time.Now().Add(-1 * time.Hour))

s.readerMx.Lock()
// We have the lock any readers blocked on reader.ReadMsg have exited.
Expand Down
13 changes: 12 additions & 1 deletion p2p/transport/webrtc/stream_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,18 @@ func (s *stream) Read(b []byte) (int, error) {
}
}

func (s *stream) SetReadDeadline(t time.Time) error { return s.dataChannel.SetReadDeadline(t) }
func (s *stream) SetReadDeadline(t time.Time) error {
s.mx.Lock()
defer s.mx.Unlock()
if s.receiveState == receiveStateReceiving {
s.setDataChannelReadDeadline(t)
}
return nil
}

func (s *stream) setDataChannelReadDeadline(t time.Time) error {
return s.dataChannel.SetReadDeadline(t)
}

func (s *stream) CloseRead() error {
s.mx.Lock()
Expand Down

0 comments on commit 7d341f3

Please sign in to comment.