From b800896b042e6f8cb271f37a3338d1451d8c914f Mon Sep 17 00:00:00 2001 From: Leonid Bugaev Date: Wed, 19 Mar 2014 19:33:24 +0600 Subject: [PATCH] Incoming ip should be included into message ID --- output_http.go | 2 +- raw_socket_listener/listener.go | 23 ++++++++++++----------- raw_socket_listener/tcp_message.go | 6 +++--- raw_socket_listener/tcp_packet.go | 6 +++++- 4 files changed, 21 insertions(+), 16 deletions(-) diff --git a/output_http.go b/output_http.go index a36440c71..f0c707fce 100644 --- a/output_http.go +++ b/output_http.go @@ -115,7 +115,7 @@ func (o *HTTPOutput) sendRequest(client *http.Client, data []byte) { if len(o.methods) > 0 && !o.methods.Contains(request.Method) { return } - + // Change HOST of original request URL := o.address + request.URL.Path + "?" + request.URL.RawQuery diff --git a/raw_socket_listener/listener.go b/raw_socket_listener/listener.go index c1b2669ff..1a416a893 100644 --- a/raw_socket_listener/listener.go +++ b/raw_socket_listener/listener.go @@ -15,7 +15,7 @@ import ( // Since we can't use default TCP libraries RAWTCPLitener implements own TCP layer // TCP packets is parsed using tcp_packet.go, and flow control is managed by tcp_message.go type Listener struct { - messages map[uint32]*TCPMessage // buffer of TCPMessages waiting to be send + messages map[string]*TCPMessage // buffer of TCPMessages waiting to be send c_packets chan *TCPPacket c_messages chan *TCPMessage // Messages ready to be send to client @@ -33,7 +33,7 @@ func NewListener(addr string, port string) (rawListener *Listener) { rawListener.c_packets = make(chan *TCPPacket, 100) rawListener.c_messages = make(chan *TCPMessage, 100) rawListener.c_del_message = make(chan *TCPMessage, 100) - rawListener.messages = make(map[uint32]*TCPMessage) + rawListener.messages = make(map[string]*TCPMessage) rawListener.addr = addr rawListener.port, _ = strconv.Atoi(port) @@ -50,7 +50,7 @@ func (t *Listener) listen() { // If message ready for deletion it means that its also complete or expired by timeout case message := <-t.c_del_message: t.c_messages <- message - delete(t.messages, message.Ack) + delete(t.messages, message.ID) // We need to use channels to process each packet to avoid data races case packet := <-t.c_packets: @@ -71,7 +71,7 @@ func (t *Listener) readRAWSocket() { for { // Note: ReadFrom receive messages without IP header - n, _, err := conn.ReadFrom(buf) + n, addr, err := conn.ReadFrom(buf) if err != nil { log.Println("Error:", err) @@ -79,17 +79,17 @@ func (t *Listener) readRAWSocket() { } if n > 0 { - t.parsePacket(buf[:n]) + t.parsePacket(addr, buf[:n]) } } } -func (t *Listener) parsePacket(buf []byte) { +func (t *Listener) parsePacket(addr net.Addr, buf []byte) { if t.isIncomingDataPacket(buf) { new_buf := make([]byte, len(buf)) copy(new_buf, buf) - t.c_packets <- ParseTCPPacket(new_buf) + t.c_packets <- ParseTCPPacket(addr, new_buf) } } @@ -119,15 +119,16 @@ func (t *Listener) isIncomingDataPacket(buf []byte) bool { // For TCP message unique id is Acknowledgment number (see tcp_packet.go) func (t *Listener) processTCPPacket(packet *TCPPacket) { defer func() { recover() }() - + var message *TCPMessage + m_id := packet.Addr.String() + strconv.Itoa(int(packet.Ack)) - message, ok := t.messages[packet.Ack] + message, ok := t.messages[m_id] if !ok { // We sending c_del_message channel, so message object can communicate with Listener and notify it if message completed - message = NewTCPMessage(packet.Ack, t.c_del_message) - t.messages[packet.Ack] = message + message = NewTCPMessage(m_id, t.c_del_message) + t.messages[m_id] = message } // Adding packet to message diff --git a/raw_socket_listener/tcp_message.go b/raw_socket_listener/tcp_message.go index 02ad03f1b..0b59bec21 100644 --- a/raw_socket_listener/tcp_message.go +++ b/raw_socket_listener/tcp_message.go @@ -15,7 +15,7 @@ const MSG_EXPIRE = 200 * time.Millisecond // Message can be compiled from unique packets with same message_id which sorted by sequence // Message is received if we didn't receive any packets for 200ms type TCPMessage struct { - Ack uint32 // Message ID + ID string // Message ID packets []*TCPPacket timer *time.Timer // Used for expire check @@ -26,8 +26,8 @@ type TCPMessage struct { } // NewTCPMessage pointer created from a Acknowledgment number and a channel of messages readuy to be deleted -func NewTCPMessage(Ack uint32, c_del chan *TCPMessage) (msg *TCPMessage) { - msg = &TCPMessage{Ack: Ack} +func NewTCPMessage(ID string, c_del chan *TCPMessage) (msg *TCPMessage) { + msg = &TCPMessage{ID: ID} msg.c_packets = make(chan *TCPPacket) msg.c_del_message = c_del // used for notifying that message completed or expired diff --git a/raw_socket_listener/tcp_packet.go b/raw_socket_listener/tcp_packet.go index 7b59c2e8a..35a3e4f70 100644 --- a/raw_socket_listener/tcp_packet.go +++ b/raw_socket_listener/tcp_packet.go @@ -2,6 +2,7 @@ package raw_socket import ( "encoding/binary" + "net" "strconv" "strings" ) @@ -34,11 +35,14 @@ type TCPPacket struct { Urgent uint16 Data []byte + + Addr net.Addr } -func ParseTCPPacket(b []byte) (p *TCPPacket) { +func ParseTCPPacket(addr net.Addr, b []byte) (p *TCPPacket) { p = &TCPPacket{Data: b} p.ParseBasic() + p.Addr = addr return p }