Skip to content

Commit

Permalink
Incoming ip should be included into message ID
Browse files Browse the repository at this point in the history
  • Loading branch information
buger committed Mar 19, 2014
1 parent 0fb1d83 commit b800896
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 16 deletions.
2 changes: 1 addition & 1 deletion output_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (o *HTTPOutput) sendRequest(client *http.Client, data []byte) {
if len(o.methods) > 0 && !o.methods.Contains(request.Method) {
return
}

// Change HOST of original request
URL := o.address + request.URL.Path + "?" + request.URL.RawQuery

Expand Down
23 changes: 12 additions & 11 deletions raw_socket_listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
// Since we can't use default TCP libraries RAWTCPLitener implements own TCP layer
// TCP packets is parsed using tcp_packet.go, and flow control is managed by tcp_message.go
type Listener struct {
messages map[uint32]*TCPMessage // buffer of TCPMessages waiting to be send
messages map[string]*TCPMessage // buffer of TCPMessages waiting to be send

c_packets chan *TCPPacket
c_messages chan *TCPMessage // Messages ready to be send to client
Expand All @@ -33,7 +33,7 @@ func NewListener(addr string, port string) (rawListener *Listener) {
rawListener.c_packets = make(chan *TCPPacket, 100)
rawListener.c_messages = make(chan *TCPMessage, 100)
rawListener.c_del_message = make(chan *TCPMessage, 100)
rawListener.messages = make(map[uint32]*TCPMessage)
rawListener.messages = make(map[string]*TCPMessage)

rawListener.addr = addr
rawListener.port, _ = strconv.Atoi(port)
Expand All @@ -50,7 +50,7 @@ func (t *Listener) listen() {
// If message ready for deletion it means that its also complete or expired by timeout
case message := <-t.c_del_message:
t.c_messages <- message
delete(t.messages, message.Ack)
delete(t.messages, message.ID)

// We need to use channels to process each packet to avoid data races
case packet := <-t.c_packets:
Expand All @@ -71,25 +71,25 @@ func (t *Listener) readRAWSocket() {

for {
// Note: ReadFrom receive messages without IP header
n, _, err := conn.ReadFrom(buf)
n, addr, err := conn.ReadFrom(buf)

if err != nil {
log.Println("Error:", err)
continue
}

if n > 0 {
t.parsePacket(buf[:n])
t.parsePacket(addr, buf[:n])
}
}
}

func (t *Listener) parsePacket(buf []byte) {
func (t *Listener) parsePacket(addr net.Addr, buf []byte) {
if t.isIncomingDataPacket(buf) {
new_buf := make([]byte, len(buf))
copy(new_buf, buf)

t.c_packets <- ParseTCPPacket(new_buf)
t.c_packets <- ParseTCPPacket(addr, new_buf)
}
}

Expand Down Expand Up @@ -119,15 +119,16 @@ func (t *Listener) isIncomingDataPacket(buf []byte) bool {
// For TCP message unique id is Acknowledgment number (see tcp_packet.go)
func (t *Listener) processTCPPacket(packet *TCPPacket) {
defer func() { recover() }()

var message *TCPMessage
m_id := packet.Addr.String() + strconv.Itoa(int(packet.Ack))

message, ok := t.messages[packet.Ack]
message, ok := t.messages[m_id]

if !ok {
// We sending c_del_message channel, so message object can communicate with Listener and notify it if message completed
message = NewTCPMessage(packet.Ack, t.c_del_message)
t.messages[packet.Ack] = message
message = NewTCPMessage(m_id, t.c_del_message)
t.messages[m_id] = message
}

// Adding packet to message
Expand Down
6 changes: 3 additions & 3 deletions raw_socket_listener/tcp_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ const MSG_EXPIRE = 200 * time.Millisecond
// Message can be compiled from unique packets with same message_id which sorted by sequence
// Message is received if we didn't receive any packets for 200ms
type TCPMessage struct {
Ack uint32 // Message ID
ID string // Message ID
packets []*TCPPacket

timer *time.Timer // Used for expire check
Expand All @@ -26,8 +26,8 @@ type TCPMessage struct {
}

// NewTCPMessage pointer created from a Acknowledgment number and a channel of messages readuy to be deleted
func NewTCPMessage(Ack uint32, c_del chan *TCPMessage) (msg *TCPMessage) {
msg = &TCPMessage{Ack: Ack}
func NewTCPMessage(ID string, c_del chan *TCPMessage) (msg *TCPMessage) {
msg = &TCPMessage{ID: ID}

msg.c_packets = make(chan *TCPPacket)
msg.c_del_message = c_del // used for notifying that message completed or expired
Expand Down
6 changes: 5 additions & 1 deletion raw_socket_listener/tcp_packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package raw_socket

import (
"encoding/binary"
"net"
"strconv"
"strings"
)
Expand Down Expand Up @@ -34,11 +35,14 @@ type TCPPacket struct {
Urgent uint16

Data []byte

Addr net.Addr
}

func ParseTCPPacket(b []byte) (p *TCPPacket) {
func ParseTCPPacket(addr net.Addr, b []byte) (p *TCPPacket) {
p = &TCPPacket{Data: b}
p.ParseBasic()
p.Addr = addr

return p
}
Expand Down

0 comments on commit b800896

Please sign in to comment.