-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbittorrent.cl
1108 lines (964 loc) · 34.2 KB
/
bittorrent.cl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
(in-package :user)
(eval-when (compile load eval)
(require :aserve)
(use-package :net.aserve.client))
(defparameter *blocksize* (* 16 1024))
;; http://wiki.theory.org/BitTorrentSpecification says that
;; 5 is a good value for 32KB blocksizes... so 10 must be a good
;; value for 16KB blocksizes [for a 5mbps link with 50ms latency]
(defparameter *max-outstanding-requests* 10)
(defparameter *max-upload-rate* 20) ;; KB/sec
(defmacro setf-bit (place bit)
`(setf ,place (logior ,place (ash 1 ,bit))))
(defmacro clearf-bit (place bit)
`(setf ,place (logandc2 ,place (ash 1 ,bit))))
(defstruct torrent
(lock (mp:make-process-lock))
files ;; list of 'f' structs
basedir
num-pieces
piece-length
pieces
announce
info-hash
id
(have 0)
total-size
(uploaded 0)
(downloaded 0)
peers
blocks-per-piece
seed
(period-timestamp (get-universal-time))
(uploaded-this-period 0)
(downloaded-this-period 0)
(upload-rate-lock (mp:make-process-lock)))
(defstruct f
length
path
stream)
(defstruct filepiece
file
file-offset
len
piece-offset)
(defstruct piece
hash
filepieces
buf
(have 0) ;; bitfield
(requested 0)) ;; bitfield
(defstruct request
index
offset
len)
(defstruct peer
sock
id ;; may not be available (or relevant)
addr
port
have
(haves-reported 0)
status
sent-requests
(choked t)
(interested nil)
(choking-us t)
(interested-in-us nil)
(bytes-received-this-period 0)
do-unchoke)
(defun peer-name (peer)
(format nil "~a:~d" (peer-addr peer) (peer-port peer)))
(defun make-info-hash (info)
(octets-to-string (sha1-string (bencode info) :return :usb8)
:end 20 :external-format :latin1))
(defun open-torrent (filename)
(let* ((dict (decode-bencoding (file-contents filename :element-type 'usb8)))
(info (dict-get "info" dict))
(tor (make-torrent :announce (dict-get "announce" dict)
:id (make-id)
:info-hash (make-info-hash info)
:piece-length (dict-get "piece length" info))))
(setf (torrent-num-pieces tor) (/ (length (dict-get "pieces" info)) 20))
(setf (torrent-blocks-per-piece tor)
(/ (torrent-piece-length tor) *blocksize*))
(let ((files (dict-get "files" info)))
(if* files
then ;; multi-file torrent
(setf (torrent-basedir tor)
(clean-filename (dict-get "name" info)))
(let ((total 0)
res)
(dolist (file files)
(push (make-f :length (dict-get "length" file)
:path (mapcar #'clean-filename (dict-get "path" file)))
res)
(incf total (dict-get "length" file)))
(setf (torrent-files tor) (nreverse res))
(setf (torrent-total-size tor) total))
else ;; single file torrent
(setf (torrent-basedir tor) ".")
(setf (torrent-files tor)
(list (make-f :length (dict-get "length" info)
:path (list (dict-get "name" info)))))
(setf (torrent-total-size tor) (dict-get "length" info))))
(compute-pieces tor (dict-get "pieces" info))
(let ((fresh (prep-torrent-files tor)))
(if* (not fresh)
then (construct-have tor)))
(open-torrent-files tor)
tor))
(defun open-torrent-files (tor)
(dolist (f (torrent-files tor))
(setf (f-stream f)
(open (f-filename tor f) :direction :io
:if-exists :overwrite))))
(defun close-torrent (tor)
(dolist (f (torrent-files tor))
(close (f-stream f))
(setf (f-stream f) nil)))
(defmacro with-torrent ((tor filename) &body body)
`(let ((,tor (open-torrent ,filename)))
(unwind-protect (progn ,@body)
(close-torrent ,tor))))
;; Acceptable filename chars. Should be good on unix and windows.
;; 0-9, a-z, A-Z, '.', '_', and '-'. (posix portable filename)
(defun valid-filename-char-p (char)
(or (alphanumericp char) (char= char #\.)
(char= char #\_) (char= char #\-)))
(defun clean-filename (filename)
;; Check for particularly suspicious activities
(if (string= filename ".")
(error "Invalid filename '.' in torrent file"))
(if (string= filename "..")
(error "Invalid filename '..' in torrent file"))
(let ((res (copy-seq filename)))
(dotimes (n (length res))
(if (not (valid-filename-char-p (schar res n)))
(setf (schar res n) #\_)))
res))
(defun list-to-filename (comps)
(list-to-delimited-string comps #\/))
(defun prep-file (base f)
(let ((comps (list base))
(path (f-path f))
(len (f-length f)))
(while (> (length path) 1)
(push (pop path) comps)
(let ((dir-name (list-to-filename (reverse comps))))
(if* (not (probe-file dir-name))
then (format t "making ~a/~%" dir-name)
(make-directory dir-name))))
(push (pop path) comps)
(let ((filename (list-to-filename (reverse comps))))
(if* (probe-file filename)
then (if (/= len (file-length filename))
(error
"~a exists but it is not the right size (~d). Aborting" filename
len))
else (format t "making ~a~%" filename)
(with-open-file (f filename :direction :output)
(file-position f (1- len))
(write-byte 0 f))))))
(defun prep-torrent-files (tor)
(let ((base (torrent-basedir tor))
(fresh nil))
(when (not (probe-file base))
(format t "making ~a/~%" base)
(make-directory base)
(setf fresh t))
(dolist (f (torrent-files tor))
(prep-file base f))
fresh))
(defun make-id ()
(concatenate 'string "-GG0100-"
(let ((tmp (make-string 12)))
(dotimes (n 12)
(setf (schar tmp n)
(code-char (+ (random 10) #.(char-code #\0)))))
tmp)))
(defun f-filename (tor f)
(list-to-delimited-string (cons (torrent-basedir tor) (f-path f)) #\/))
(defun construct-have (tor)
(format t "Scanning files...~%")
(let* ((have 0)
(piece-length (torrent-piece-length tor))
(pieces (torrent-pieces tor))
(buf (make-usb8 piece-length))
(pos 0)
(index 0))
(dolist (file (torrent-files tor))
(with-open-file (f (f-filename tor file))
(loop
(let ((stop (read-vector buf f :start pos)))
(if (= stop pos)
(return)) ;; EOF
(if* (= stop piece-length)
then (if* (equalp (sha1-string buf :return :usb8)
(piece-hash (aref pieces index)))
then (write-char #\*)
(setf-bit have index)
else (write-char #\.))
(finish-output)
(incf index)
(setf pos 0)
else (setf pos stop))))))
;; Compute the final hash (if there is one)
(if* (not (zerop pos))
then (if* (equalp (sha1-string buf :end pos :return :usb8)
(piece-hash (aref pieces index)))
then (write-char #\*)
(setf-bit have index)
else (write-char #\.)))
(format t "~%Scan complete.~%")
(setf (torrent-have tor) have)
(when (= (logcount have) (torrent-num-pieces tor))
(format t "Seeding.~%")
(setf (torrent-seed tor) t))))
#+ignore
(defun compute-file-locations (tor)
(let* ((pos 0)
(index 0)
(piece-length (torrent-piece-length tor)))
(dolist (file (torrent-files tor))
(let* ((endpos (+ pos (f-length file)))
(endindex (truncate (/ endpos piece-length))))
(setf (f-start-piece file) index)
(setf (f-start-piece-offset file) (mod pos piece-length))
(setf index endindex)
(setf pos endpos)))))
(defun compute-pieces (tor pieces-string)
(let* ((files (torrent-files tor))
(file (pop files))
(num-pieces (torrent-num-pieces tor))
(pieces (make-array num-pieces))
(file-offset 0)
(file-bytes-remaining (f-length file))
(piece-length (torrent-piece-length tor))
(string-pos 0))
(if (not (zerop (mod (length pieces-string) 20)))
(error "hash list is not a multiple of 20"))
(dotimes (n num-pieces)
(let ((piece-offset 0)
(piece-bytes-remaining piece-length)
fps)
(loop
(let* ((used (min file-bytes-remaining piece-bytes-remaining))
(fp (make-filepiece :file file
:file-offset file-offset
:len used
:piece-offset piece-offset)))
(push fp fps)
(incf file-offset used)
(incf piece-offset used)
(decf file-bytes-remaining used)
(decf piece-bytes-remaining used)
(when (zerop file-bytes-remaining)
(when (setf file (pop files))
(setf file-offset 0)
(setf file-bytes-remaining (f-length file))))
(if (or (null file) (zerop piece-bytes-remaining))
(return))))
(setf (aref pieces n)
(make-piece
:filepieces (nreverse fps)
:hash (string-to-octets (subseq pieces-string string-pos
(+ string-pos 20))
:null-terminate nil
:external-format :latin1)))
(incf string-pos 20)))
(setf (torrent-pieces tor) pieces)))
(defun probe-tracker (tor &key event)
(let ((data (do-http-request (torrent-announce tor)
:query `(("info_hash" . ,(torrent-info-hash tor))
("peer_id" . ,(torrent-id tor))
("port" . "2706")
("uploaded" . ,(torrent-uploaded tor))
("downloaded" . ,(torrent-downloaded tor))
("left" . ,(- (torrent-total-size tor)
(torrent-downloaded tor)))
("compact" . "1")
,@(if event `("event" . ,event))))))
(setf data (decode-bencoding data))
(if (dict-get "failure reason" data)
(error "Tracker reported failure: ~a" (dict-get "failure reason" data)))
(if (dict-get "warning message" data)
(format t "Tracker reported warning: ~a" (dict-get "warning message" data)))
(format t "Interval: ~a seconds~%" (dict-get "interval" data))
(if (dict-get "min interval" data)
(format t "Min interval: ~a seconds~%" (dict-get "min interval" data)))
(if (dict-get "tracker id" data)
(format t "Tracker ID: ~a~%" (dict-get "tracker id" data)))
(format t "Seeders: ~s~%" (dict-get "complete" data))
(format t "Leechers: ~s~%" (dict-get "incomplete" data))
(let ((peers (dict-get "peers" data)))
(if* (stringp peers)
then ;; compact format
(parse-compact-peers peers)
elseif (listp peers)
then ;; full format
(parse-peers peers)
else (error "Unexpected peers data: ~s" peers)))))
(defun parse-peers (peers)
(let (res)
(dolist (peer peers)
(push (make-peer :id (dict-get "peer id" peer)
:addr (dict-get "ip" peer)
:port (dict-get "port" peer))
res))
res))
(defun parse-compact-peers (peers)
(if (not (zerop (rem (length peers) 6)))
(error "compact peers list has invalid format (not a multiple of 6)"))
(macrolet ((getbyte (pos)
`(char-code (schar peers ,pos))))
(let ((n 0)
(max (length peers))
res)
(while (< n max)
(push (make-peer :addr
(socket:ipaddr-to-dotted
(logior (ash (getbyte (+ 0 n)) 24)
(ash (getbyte (+ 1 n)) 16)
(ash (getbyte (+ 2 n)) 8)
(ash (getbyte (+ 3 n)) 0)))
:port (logior (ash (getbyte (+ 4 n)) 8)
(ash (getbyte (+ 5 n)) 0)))
res)
(incf n 6))
res)))
(defun do-handshake (peer tor stream)
(let ((proto "BitTorrent protocol"))
(write-byte (length proto) stream)
(write-string proto stream)
;; reserved bytes
(dotimes (n 8)
(write-byte 0 stream))
;; info-hash
(write-string (torrent-info-hash tor) stream)
;; peer-id
(write-string (torrent-id tor) stream)
(finish-output stream))
(let* ((pstrlen (read-byte stream))
(pstr (make-string pstrlen))
(info-hash (make-string 20))
(peer-id (make-string 20)))
(dotimes (n pstrlen)
(setf (schar pstr n) (read-char stream)))
;;(format t "Peer protocol: ~a~%" pstr)
;; reserved bytes
(dotimes (n 8)
(read-byte stream))
;; info-hash
(dotimes (n 20)
(setf (schar info-hash n) (read-char stream)))
;; peer-id
(dotimes (n 20)
(setf (schar peer-id n) (read-char stream)))
(setf (peer-id peer) peer)
(if* (string= info-hash (torrent-info-hash tor))
then ;;(format t "info-hash matches.~%")
t
else ;;(format t "info-hash does not match.~%")
nil)))
(defmacro with-socket-1 ((sock) &body body)
`(unwind-protect (progn ,@body)
(ignore-errors (close ,sock))
(ignore-errors (close ,sock :abort t))))
(defun bitfield-to-bytes (tor)
(declare (optimize (speed 3) (safety 0)))
(let* ((have (torrent-have tor))
(num-bits (torrent-num-pieces tor))
(len (ceiling (/ num-bits 8)))
(buf (make-usb8 len :initial-element 0))
(mask #x80)
(pos 0)
(byte 0))
(declare (usb8 mask byte)
(ausb8 buf)
(fixnum num-bits len pos))
(dotimes (n num-bits)
(if (logbitp n have)
(setf byte (logior byte mask)))
(when (zerop (setf mask (ash mask -1)))
(setf mask #x80)
(setf (aref buf pos) byte)
(incf pos)
(setf byte 0)))
buf))
(defun write-complete-vector (vec stream &key (start 0) (end (length vec)))
(while (< start end)
(let ((stop (write-vector vec stream :start start :end end)))
(if (= stop start)
(error "write failed"))
(setf start stop))))
(defun read-complete-vector (vec stream &key (start 0) (end (length vec)))
(while (< start end)
(let ((stop (read-vector vec stream :start start :end end)))
(when (= stop start)
(mp:process-kill sys:*current-process*)
#+ignore(error "Unexpected EOF"))
(setf start stop))))
(defun get-uint32 (vec)
(declare (optimize (speed 3) (safety 0))
(ausb8 vec))
;; For best compilation, the order here matters.
(logior (logior
(ash (aref vec 1) 16)
(ash (aref vec 2) 8)
(aref vec 3))
(ash (aref vec 0) 24)))
(defun set-uint32 (vec value)
(declare (optimize (speed 3) (safety 0))
(ausb8 vec))
(setf (aref vec 0) (logand #xff (ash value -24)))
(setf (aref vec 1) (logand #xff (ash value -16)))
(setf (aref vec 2) (logand #xff (ash value -8)))
(setf (aref vec 3) (logand #xff value)))
(defun write-uint32 (stream value)
(declare (optimize (speed 3)))
(let ((buf (make-usb8 4)))
(declare (dynamic-extent buf))
(set-uint32 buf value)
(write-complete-vector buf stream)))
(defun read-uint32 (stream)
(declare (optimize (speed 3)))
(let ((buf (make-usb8 4)))
(declare (dynamic-extent buf))
(read-complete-vector buf stream)
(get-uint32 buf)))
;; call with lock held
(defun write-bitfield (stream bytes)
(write-uint32 stream (1+ (length bytes)))
(write-byte 5 stream) ;; 'bitfield' message
(write-vector bytes stream)
(finish-output stream))
(defun read-bitfield (stream len tor)
(let ((buf (make-usb8 len))
(have 0)
(mask 0)
(pos 0)
byte)
(read-complete-vector buf stream)
(dotimes (n (torrent-num-pieces tor))
(if* (zerop mask)
then (setf mask #x80)
(setf byte (aref buf pos))
(incf pos))
(if* (not (zerop (logand byte mask)))
then (setf-bit have n))
(setf mask (ash mask -1)))
have))
;; Call with torrent locked.
(defun compute-wants-bitfield (peer tor)
(let ((have (peer-have peer)))
(if* have
then (logandc1 (torrent-have tor) (peer-have peer))
else 0)))
(defun compute-piece-len (tor index)
(let ((nominal-len (torrent-piece-length tor))
(total (torrent-total-size tor)))
(if* (< (* (1+ index) nominal-len) total)
then nominal-len
else (mod total nominal-len))))
;; Returns the number of blocks and the length of the last block.
;; For all but the last piece, this will be
;; torrent-blocks-per-piece and *blocksize*
(defun compute-piece-info (tor index)
(let* ((piece-len (compute-piece-len tor index))
(blocks (ceiling (/ piece-len *blocksize*))))
(dotimes (n (1- blocks))
(decf piece-len *blocksize*))
(values blocks piece-len)))
;; Call with torrent locked.
(defmacro piece-bitfield-full-p (bf max-pop)
`(= (logcount ,bf) ,max-pop))
;; Call with torrent locked
(defun find-incomplete-wanted-piece (wants tor)
(dotimes (n (torrent-num-pieces tor))
(when (logbitp n wants)
(let ((piece (aref (torrent-pieces tor) n))
(blocks (compute-piece-info tor n)))
(if (not (piece-bitfield-full-p (piece-requested piece) blocks))
(return n))))))
(defun index-of-first-unset-bit (value)
(let ((n 0))
(while (logbitp n value)
(incf n))
n))
(defun alloc-bit (value)
(let ((index (index-of-first-unset-bit value)))
(values index (logior value (ash 1 index)))))
;; Call with torrent locked
(defun alloc-request-1 (tor index)
(let ((piece (aref (torrent-pieces tor) index)))
(multiple-value-bind (blocks last-block-len)
(compute-piece-info tor index)
(multiple-value-bind (blocknum requested)
(alloc-bit (piece-requested piece))
;; Sanity check.
(if (>= blocknum blocks)
(error "allocated more than the available # of blocks"))
(setf (piece-requested piece) requested)
(make-request :index index
:offset (* *blocksize* blocknum)
:len (if* (= (1+ blocknum) blocks)
then last-block-len
else *blocksize*))))))
(defun alloc-and-submit-request (sock tor peer)
(let* ((wants (compute-wants-bitfield peer tor))
(index (find-incomplete-wanted-piece wants tor)))
(if index
(let ((req (alloc-request-1 tor index)))
;;(format t "Allocated request: ~a~%" req)
(push req (peer-sent-requests peer))
(submit-request sock req)
t))))
(defun submit-request (sock req)
(write-uint32 sock 13)
(write-byte 6 sock) ;; request
(write-uint32 sock (request-index req))
(write-uint32 sock (request-offset req))
(write-uint32 sock (request-len req))
(finish-output sock))
(defun express-interest (peer sock)
;;(format t "Expressing interest to ~a~%" (peer-name peer))
(write-uint32 sock 1)
(write-byte 2 sock) ;; interested
(finish-output sock)
(setf (peer-interested peer) t))
(defun express-disinterest (peer sock)
;;(format t "Expressing disinterest to ~a~%" (peer-name peer))
(write-uint32 sock 1)
(write-byte 3 sock) ;; not interested
(finish-output sock)
(setf (peer-interested peer) nil))
(defun process-bitfield-msg (peer msglen tor sock)
;;(format t "BITFIELD~%")
(if (peer-have peer)
(error "peer sent bitfield message but we already have it"))
(if (/= msglen (ceiling (/ (torrent-num-pieces tor) 8)))
(error "peer sent bitfield message with invalid length"))
(setf (peer-have peer) (read-bitfield sock msglen tor)))
(defun process-have-msg (peer msglen tor sock)
(if (/= msglen 4)
(error "invalid HAVE message length"))
(let ((index (read-uint32 sock))
(have (or (peer-have peer) 0)))
;;(format t "HAVE ~d~%" index)
(if (>= index (torrent-num-pieces tor))
(error "Invalid index"))
(setf-bit have index)
(setf (peer-have peer) have)))
(defun process-piece-msg (peer msglen tor sock)
(decf msglen 8)
(if (< msglen 1)
(error "invalid piece data len: ~d~%" msglen))
(let* ((index (read-uint32 sock))
(begin (read-uint32 sock))
(req (find-matching-req peer index begin msglen)))
#+ignore
(format t "~a: PIECE ~d, o: ~d, l: ~d~%"
(peer-name peer) index begin msglen)
(if* (null req)
then ;; Unexpected data. Discard.
(format t "Not expecting this piece. Discarding.~%")
(dotimes (n msglen)
(read-byte sock))
else (let ((piece (aref (torrent-pieces tor) index))
buf)
(mp:with-process-lock ((torrent-lock tor))
(setf buf (or (piece-buf piece)
(setf (piece-buf piece)
(make-usb8 (compute-piece-len tor index))))))
(read-complete-vector buf sock
:start begin :end (+ begin msglen))
(incf (peer-bytes-received-this-period peer) msglen)
(mp:with-process-lock ((torrent-lock tor))
(incf (torrent-downloaded-this-period tor) msglen)
(finish-request req piece peer tor))))))
(defun find-matching-req (peer index begin len)
(dolist (req (peer-sent-requests peer))
(if (and (= (request-index req) index)
(= (request-offset req) begin)
(= (request-len req) len))
(return req))))
(defun finish-request (req piece peer tor)
(mp:with-process-lock ((torrent-lock tor))
(let* ((blocknum (/ (request-offset req) *blocksize*))
(index (request-index req))
(blocks (compute-piece-info tor index)))
(setf (peer-sent-requests peer)
(delete req (peer-sent-requests peer)))
;;(format t "finish-request, block: ~d~%" blocknum)
;;(clearf-bit (piece-requested piece) blocknum)
(setf-bit (piece-have piece) blocknum)
;; check to see if we have the whole piece now.
(when (piece-bitfield-full-p (piece-have piece) blocks)
;;(format t "Verifying piece ~a~%" index)
(if* (verify-piece piece)
then (format t "*** Piece ~a verified ***~%" index)
(write-piece piece)
(setf (piece-buf piece) nil)
(setf-bit (torrent-have tor) index)
(if (= (logcount (torrent-have tor)) (torrent-num-pieces tor))
(setf (torrent-seed tor) t))
else ;; Verify failed
(format t "Verify failed. Discarding piece.~%")
(setf (piece-requested piece) 0)
(setf (piece-have piece) 0))))))
(defun verify-piece (piece)
(equalp (piece-hash piece) (sha1-string (piece-buf piece) :return :usb8)))
;; Call with torrent locked
(defun write-piece (piece)
(let ((buf (piece-buf piece))
(pos 0))
(dolist (fp (piece-filepieces piece))
(let* ((file (filepiece-file fp))
(stream (f-stream file))
(len (filepiece-len fp)))
(file-position stream (filepiece-file-offset fp))
(write-vector buf stream :start pos :end (+ pos len))
(finish-output stream)
(incf pos len)))))
(defun cancel-outstanding-sent-requests (peer tor)
;; Cancel any outstanding requests made to this peer.
(mp:with-process-lock ((torrent-lock tor))
(dolist (req (peer-sent-requests peer))
;;(format t "Removing request: ~a~%" req)
(let* ((index (request-index req))
(piece (aref (torrent-pieces tor) index))
(blocknum (/ (request-offset req) *blocksize*)))
(clearf-bit (piece-requested piece) blocknum)))
(setf (peer-sent-requests peer) nil)))
(defun process-choke-msg (peer tor)
(format t "~a: CHOKE~%" (peer-name peer))
(setf (peer-choking-us peer) t)
(cancel-outstanding-sent-requests peer tor))
(defun process-interest-changes (peer tor sock)
(let ((wants (mp:with-process-lock ((torrent-lock tor))
(compute-wants-bitfield peer tor))))
(if* (and (zerop wants) (peer-interested peer))
then (express-disinterest peer sock)
elseif (and (not (zerop wants)) (not (peer-interested peer)))
then (express-interest peer sock))))
(defun maybe-submit-requests (peer tor sock)
(when (and (peer-interested peer) (not (peer-choking-us peer)))
(while (and (< (length (peer-sent-requests peer))
*max-outstanding-requests*)
(alloc-and-submit-request sock tor peer)))))
(defun report-haves (peer tor sock)
(let* ((haves (mp:with-process-lock ((torrent-lock tor))
(torrent-have tor)))
(reported (peer-haves-reported peer))
(unreported (logandc2 haves reported))
;; report stuff in unreported that is not in peer-have
(to-report (logandc2 unreported (or (peer-have peer) 0))))
(when (not (zerop to-report))
(dotimes (n (torrent-num-pieces tor))
(when (logbitp n to-report)
;;(format t "Reporting HAVE ~d to ~a~%" n (peer-name peer))
(write-uint32 sock 5)
(write-byte 4 sock) ;; HAVE
(write-uint32 sock n)
(setf-bit (peer-haves-reported peer) n)))
(finish-output sock))))
(defmacro with-socket-error-handler (() &body body)
`(handler-case (progn ,@body)
(socket-error (c)
(declare (ignore c)))
(errno-stream-error (c)
(if (/= (stream-error-code c) excl::*epipe*)
(error c)))))
(defmacro with-peer-connection ((peer tor) &body body)
`(unwind-protect
(with-socket-error-handler () ,@body)
;; cleanup
(format t "~a disconnecting.~%" (peer-name ,peer))
(cancel-outstanding-sent-requests ,peer ,tor)
(mp:with-process-lock ((torrent-lock tor))
(setf (torrent-peers tor)
(delete peer (torrent-peers tor))))))
;; TODO: send keep-alive every 2 minutes.
;; TODO: Disconnect from seeds if we're a seed as well.
;; TODO: Use rarest-first download strategy.
(defun handle-connection (sock tor peer)
(with-socket-1 (sock)
(when (not (ignore-errors (do-handshake peer tor sock)))
(setf (peer-status peer) :handshake-failed)
(return-from handle-connection))
(setf (peer-status peer) :connected)
;; Initialize
(setf (peer-have peer) nil)
(setf (peer-sent-requests peer) nil)
(setf (peer-choked peer) t)
(setf (peer-interested peer) nil)
(setf (peer-choking-us peer) t)
(setf (peer-interested-in-us peer) nil)
;; If we have at least one piece, transmit our bitmap.
(let (have bytes)
(mp:with-process-lock ((torrent-lock tor))
(setf have (torrent-have tor))
(if (not (zerop have))
(setf bytes (bitfield-to-bytes tor))))
(when bytes
;;(format t "Transmitting our bitmap.~%")
(write-bitfield sock bytes)
(setf (peer-haves-reported peer) have)))
(with-peer-connection (peer tor)
(loop
(handler-case (process-interest-changes peer tor sock)
(socket-error (c)
(declare (ignore c))
(setf (peer-status peer) :hung-up)
(return))
(error (c)
(error c)))
(maybe-submit-requests peer tor sock)
(report-haves peer tor sock)
(when (mp:wait-for-input-available sock :timeout 5)
(let ((msglen (ignore-errors (read-uint32 sock)))
msg)
(if* (null msglen)
then ;; Peer hung up.
(setf (peer-status peer) :hung-up)
(return)
elseif (= msglen 0)
then ;;(format t "Got keep-alive message.~%")
nil
else (setf msg (read-byte sock))
(decf msglen)
(case msg
(0 (process-choke-msg peer tor))
(1 ;;(format t "UNCHOKE~%")
(setf (peer-choking-us peer) nil))
(2 (format t "~a: INTERESTED~%" (peer-name peer))
(setf (peer-interested-in-us peer) t))
(3 (format t "~a: NOT INTERESTED~%" (peer-name peer))
(setf (peer-interested-in-us peer) nil))
(4 (process-have-msg peer msglen tor sock))
(5 (process-bitfield-msg peer msglen tor sock))
(6 (process-request-msg peer msglen tor sock))
(7 (process-piece-msg peer msglen tor sock))
(8 (process-cancel-msg peer msglen tor sock))
(t (error "Unsupported message type: ~d" msg))))))))))
;; we don't do anything with this because we don't process requests
;; asynchronously
(defun process-cancel-msg (peer msglen tor sock)
(declare (ignore peer tor))
(if (/= msglen 12)
(error "Invalid cancel message len"))
(let* ((index (read-uint32 sock))
(offset (read-uint32 sock))
(length (read-uint32 sock)))
index
offset
length
#+ignore
(format t "CANCEL i: ~d, o: ~d, l: ~d~%" index offset length)))
(defun process-request-msg (peer msglen tor sock)
(if (/= msglen 12)
(error "Invalid request message len."))
(let* ((index (read-uint32 sock))
(offset (read-uint32 sock))
(length (read-uint32 sock))
plen)
#+ignore
(format t "~a: REQUEST i: ~d, o: ~d, l: ~d~%"
(peer-name peer) index offset length)
(if (> length #.(* 128 1024))
(error "Request length of ~d is too large" length))
(if (>= index (torrent-num-pieces tor))
(error "Request for invalid index: ~d" index))
(setf plen (compute-piece-len tor index))
(if (>= offset plen)
(error "Request for invalid piece offset: ~d" offset))
(if (> (+ offset length) plen)
(error "Request for invalid section of piece (offset+len exceeds size of piece)"))
;; Only heed requests that were supplied by unchoked peers.
(when (not (peer-choked peer))
(send-piece peer tor index offset length))))
(defun get-current-upload-rate (tor)
(mp:with-process-lock ((torrent-lock tor))
(let* ((now (get-universal-time))
(elapsed (- now (torrent-period-timestamp tor))))
(when (zerop elapsed)
(sleep 1)
(incf elapsed 1))
(/ (torrent-uploaded-this-period tor) 1024.0 elapsed))))
;; The lock ensures that there is only one busy waiter.
(defun wait-for-upload-transfer-rate (tor)
(mp:with-process-lock ((torrent-upload-rate-lock tor))
(while (> (get-current-upload-rate tor) *max-upload-rate*)
;;(format t "rating for upload rate to drop...~%")
(sleep 1))))
(defun send-piece (peer tor index offset length)
(let* ((sock (peer-sock peer))
(piece (aref (torrent-pieces tor) index))
(fps (piece-filepieces piece))
(fp (pop fps)))
(wait-for-upload-transfer-rate tor)
#+ignore
(format t "Sending piece ~d (~d/~d) to ~a~%"
index offset length (peer-name peer))
(write-uint32 sock (+ 9 length))
(write-byte 7 sock)
(write-uint32 sock index)
(write-uint32 sock offset)
;; Find the filepiece which contains 'offset'.
(while (>= offset (+ (filepiece-piece-offset fp) (filepiece-len fp)))
(decf offset (filepiece-len fp))
(setf fp (pop fps)))
;;(format t "starting with fp: ~s~%" fp)
;;(format t "offset within that file: ~d~%" offset)
(while (> length 0)
;;(format t "bytes remaining to send: ~d~%" length)
(let* ((count (min (- (filepiece-len fp) offset) length))
(stream (f-stream (filepiece-file fp)))
(buf (make-usb8 count)))
;;(format t "want to read ~d bytes from fp.~%" count)
(mp:with-process-lock ((torrent-lock tor))
(file-position stream (+ (filepiece-file-offset fp) offset))
(read-complete-vector buf stream :end count))
(write-complete-vector buf sock :end count)
(mp:with-process-lock ((torrent-lock tor))
(incf (torrent-uploaded-this-period tor) count))
;;(format t "wrote ~d bytes to socket.~%" count)
(decf length count)
(setf fp (pop fps))
(setf offset 0)
(if (and fp (> length 0))
(format t "Moved to fp ~a, offset 0.~%" fp))))
(finish-output sock)))
(defparameter *max-uploads* 4)
;; non-seed Alg:
;; Unchoke the (1- *max-uploads*) fastest interested uploaders. This
;; provides reciprocation
;; Unchoke uninterested uploaders who are faster than uploaders in the
;; prior group. This step may be unnecessary if we simply call
;; this alg any time interest changes (does that result in too much
;; work?)
;; choke everyone else (don't send choke message if they are already
;; choked).
;; TODO: When choking, kill off any pending requests they have made.
;; The above is not done yet. For the time being, we're just going
;; to select the first *max-uploads* fastest interested uploaders.
;; Sorts by speed. Call with torrent locked
(defun get-interested-peers (tor)
(let (interested)
(dolist (peer (torrent-peers tor))
(if (and (eq (peer-status peer) :connected)
(peer-interested-in-us peer))
(push peer interested)))
(sort interested
#'(lambda (a b) (> (peer-bytes-received-this-period a)
(peer-bytes-received-this-period b))))))
(defun reset-received-counts (tor)
(dolist (peer (torrent-peers tor))
(setf (peer-bytes-received-this-period peer) 0)))
(defun choke (peer)
(format t "Sending choke to ~a~%" (peer-name peer))
(let ((sock (peer-sock peer)))
(write-uint32 sock 1)
(write-byte 0 sock) ;; choke
(finish-output sock)
(setf (peer-choked peer) t)))
(defun unchoke (peer)
(format t "Sending unchoke to ~a~%" (peer-name peer))
(let ((sock (peer-sock peer)))
(write-uint32 sock 1)