Skip to content
This repository has been archived by the owner on Jan 12, 2024. It is now read-only.

Commit

Permalink
fix -compress bug
Browse files Browse the repository at this point in the history
  • Loading branch information
unknown committed Jul 27, 2017
1 parent 15c5305 commit d9de809
Showing 1 changed file with 21 additions and 51 deletions.
72 changes: 21 additions & 51 deletions pipe/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,18 +128,17 @@ type UDPMakeSession struct {

xor string

compressCache []byte
compressSendChan chan []byte
fecDataShards int
fecParityShards int
fecW *reedsolomon.Encoder
fecR *reedsolomon.Encoder
fecRCacheTbl map[uint]*fecInfo
fecWCacheTbl *fecInfo
fecWriteId uint //uint16
fecSendC uint
fecSendL int
fecRecvId uint
compressCache []byte
fecDataShards int
fecParityShards int
fecW *reedsolomon.Encoder
fecR *reedsolomon.Encoder
fecRCacheTbl map[uint]*fecInfo
fecWCacheTbl *fecInfo
fecWriteId uint //uint16
fecSendC uint
fecSendL int
fecRecvId uint

confuseSeed int
}
Expand Down Expand Up @@ -200,8 +199,8 @@ func (l *Listener) inner_loop() {
//don't close pipe, just drop this data
continue
}
//log.Println("decompress", len(_b), n)
buf = _b
//log.Println("decompress", n, len(_b))
}
session.DoAction2("input", buf, len(buf))
}
Expand All @@ -228,24 +227,6 @@ func (l *Listener) inner_loop() {
}
if int(fec)&(1<<7) != 0 {
session.compressCache = make([]byte, zappy.MaxEncodedLen(ReadBufferSize))
session.compressSendChan = make(chan []byte, 100)
go func() {
for {
select {
case b := <-session.compressSendChan:
enc, er := zappy.Encode(session.compressCache, b)
if er != nil {
log.Println("compress error", er.Error())
go session.Close()
break
}
//log.Println("compress", len(b), len(enc))
session.sock.WriteTo(enc, session.remote)
case <-session.quitChan:
return
}
}
}()
}
l.sessionsLock.Lock()
l.sessions[addr] = session
Expand Down Expand Up @@ -380,24 +361,6 @@ func DialTimeoutWithSetting(addr string, timeout int, setting *KcpSetting, ds, p
}
if comp {
session.compressCache = make([]byte, zappy.MaxEncodedLen(ReadBufferSize))
session.compressSendChan = make(chan []byte, 100)
go func() {
for {
select {
case b := <-session.compressSendChan:
enc, er := zappy.Encode(session.compressCache, b)
if er != nil {
log.Println("compress error", er.Error())
go session.Close()
break
}
//log.Println("compress", len(b), len(enc))
session.sock.WriteTo(enc, session.remote)
case <-session.quitChan:
return
}
}
}()
}
_timeout := int(timeout / 2)
if _timeout < 5 {
Expand Down Expand Up @@ -520,7 +483,14 @@ out:

func (session *UDPMakeSession) writeTo(b []byte) {
if session.compressCache != nil && len(b) > 7 {
session.compressSendChan <- b
enc, er := zappy.Encode(session.compressCache, b)
if er != nil {
log.Println("compress error", er.Error())
go session.Close()
return
}
//log.Println("compress", len(b), len(enc))
session.sock.WriteTo(enc, session.remote)
} else {
session.sock.WriteTo(b, session.remote)
}
Expand Down Expand Up @@ -712,8 +682,8 @@ func (session *UDPMakeSession) loop() {
//don't close pipe, just drop data
continue
}
//log.Println("decompress", len(_b), n)
buf = _b
//log.Println("decompress", n, len(_b))
}
session.DoAction2("input", buf, len(buf))
}
Expand Down

0 comments on commit d9de809

Please sign in to comment.