forked from amy/Bittorrent
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathpeercontactmanager.go
229 lines (193 loc) · 5.33 KB
/
peercontactmanager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
package main
/*
manages connection to multiple peers simultaneously
responsible for manager incoming/outgoing connections
*/
import (
//"bufio"
//"bytes"
//"encoding/binary"
//"errors"
"fmt"
// "log"
"math"
"net"
"strconv"
"sync"
//"strings"
"time"
)
//TorrentInfo used to consolidate space, not the same as InfoDict
//Torrent info that peer downloader needs from the torrent file
type TorrentInfo struct {
TInfo *InfoDict //torrent info dict
ClientID string //peer id client chose
ProtoName string //bittorent protocol version
ProtoNameLen int //length
InfoHash string //hash for this torrent
}
//PeerDownloader used to communicate with the list of peers
type PeerContactManager struct {
tInfo TorrentInfo //information about the torrent [see above]
pieceManager PieceManager //manages requests for pieces
maxConnections uint32
maxUnchoked uint32
in chan bool
out chan bool
msgQueueMax int //maxmimum number of pieces queue up for a peer
wg *sync.WaitGroup
inComingChanListLock *sync.Mutex
outGoingChanListLock *sync.Mutex
outGoing []chan bool
inComing []chan bool
tracker *TrackerInfo
waitToDownload chan bool
}
/*
NewPeerDownloader create a new peerdownloader
* @tInfo: torrent info dictionary
* @fileName: file to save pieces too
* @maxConnections: maximum TCP connections to peers (in or out) allowed)
* @maxUnchoked: maximum number of peers we can unchoke at once
* returns: new PeerDownloader
*/
func NewPeerContactManager(tracker *TrackerInfo, wg *sync.WaitGroup, tInfo TorrentInfo, fileName string, maxConnections uint32, maxUnchoked uint32, maxMsgQueue int) PeerContactManager {
var p PeerContactManager
p.wg = wg
p.tInfo = tInfo
//global manager for pieces we have and need
p.pieceManager = NewPieceManager(tInfo.TInfo, 10, fileName)
//number of peers allowed to be connected to simultaneously
p.maxConnections = maxConnections
//number of peers we are allowed to unchoke
p.maxUnchoked = maxUnchoked
p.in = make(chan bool) //receive requests for unchoking
p.out = make(chan bool) //respond to requests for unchoking
p.msgQueueMax = maxMsgQueue
p.tracker = tracker
p.waitToDownload = make(chan bool)
go func(tracker *TrackerInfo) {
status := p.pieceManager.WaitForDownload()
<-p.waitToDownload
now := time.Now()
for {
select {
case <-status:
fmt.Println("Time for Download: ", time.Since(now))
tracker.Uploaded, tracker.Downloaded, tracker.Left = p.GetProgress()
if math.Abs(float64(tracker.Left)) < float64(p.tInfo.TInfo.PieceLength) {
tracker.Left = 0
}
tracker.sendGetRequest("completed")
fmt.Println("Download complete")
return
}
}
}(p.tracker)
return p
}
/*
* given a list of peers to connect too, opens up outgoing connections up to maxConnections
* @peers: list of peers to contact, up to maxConnections
* returns: error
*/
func (t *PeerContactManager) StartOutgoing(peers []Peer) error {
//handle the peer connection
writeToChan := true
for _, peerEntry := range peers {
// 1.) make TCP connection
conn, err := net.Dial("tcp", peerEntry.IP+":"+strconv.FormatInt(peerEntry.Port, 10))
if writeToChan == true {
t.waitToDownload <- true
writeToChan = false
}
if err != nil {
return err
}
//spawn routine to handle connection
t.wg.Add(1)
go t.handler(conn, peerEntry)
}
t.wg.Wait()
return nil
}
func (t *PeerContactManager) handler(tcpConnection net.Conn, peer Peer) {
fmt.Printf("connection to %v spawned\n", peer.IP)
//open up a new connection manager
manager := NewConnectionManager(&t.pieceManager, t.msgQueueMax, t.in, t.out)
//start up the connection
if err := manager.StartConnection(tcpConnection, peer, t.tInfo, 120, 2); err != nil {
//fmt.Printf("Failed to connect to %v: %v\n", tcpConnection.RemoteAddr(), err)
tcpConnection.Close()
t.wg.Done()
return
}
//loop receiving and sending messages
//send loop ( this might possibly speed things up
errChan := make(chan error)
go func(errChan chan error) {
for {
err := manager.SendNextMessage()
if err != nil {
errChan <- err
return
}
select {
case <-errChan:
return
default:
}
}
}(errChan)
//receive loop
for {
err := manager.ReceiveNextMessage()
if err != nil {
errChan <- err
break
}
select {
case <-errChan:
break
case <-t.in: //just unchoke all peers
t.out <- true
default:
}
}
manager.StopConnection()
tcpConnection.Close()
t.wg.Done()
}
func (t *PeerContactManager) GetProgress() (int, int, int) {
return t.pieceManager.GetProgress()
}
func (t *PeerContactManager) StopDownload() error {
// Stop go functions here ?
return t.pieceManager.SaveProgress()
}
/*
* opens up a listener to listen for incoming peer connections
* @port to openup listener on
* returns error
*/
func (t *PeerContactManager) StartIncoming(port uint32) error {
// listen on all network interfaces on port input
ln, err := net.Listen("tcp", ":"+strconv.Itoa(int(port)))
if err != nil {
return err
}
defer ln.Close()
for {
conn, err := ln.Accept()
if err != nil {
return err
}
t.wg.Add(1)
go t.incomingHandler(conn)
}
return nil
}
func (t *PeerContactManager) incomingHandler(conn net.Conn) {
fmt.Println(conn.LocalAddr().String(), " Got connection from ", conn.RemoteAddr().String())
t.handler(conn, Peer{})
}