Skip to content

Commit

Permalink
Merge branch 'master' into encoding-chunked
Browse files Browse the repository at this point in the history
Conflicts:
	output_http.go
  • Loading branch information
buger committed Jun 26, 2015
2 parents 390f75f + a059648 commit 534e9b2
Show file tree
Hide file tree
Showing 10 changed files with 93 additions and 67 deletions.
75 changes: 36 additions & 39 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ sudo gor --input-http :28019 --output-http "http://staging.com"

Then in your application you should send copy (e.g. like reverse proxy) all incoming requests to Gor http input.

### Following redirects
If you have a scenario where following redirects is usefull you can do it like with:

```
gor --input-tcp replay.local:28020 --output-http http://staging.com --output-http-redirects 10
```
The given example will follow up to 10 redirects per request.

## Advanced use

Expand Down Expand Up @@ -186,59 +193,49 @@ https://github.com/buger/gor/releases
`gor -h` output:
```
-cpuprofile="": write cpu profile to file
-memprofile="": write memory profile to this file
-input-dummy=[]: Used for testing outputs. Emits 'Get /' request every 1s
-input-file=[]: Read requests from file:
gor --input-file ./requests.gor --output-http staging.com
gor --input-file ./requests.gor --output-http staging.com
-input-http=[]: Read requests from HTTP, should be explicitly sent from your application:
# Listen for http on 9000
gor --input-http :9000 --output-http staging.com
-input-raw=[]: Capture traffic from given port (use RAW sockets and require *sudo* access):
# Capture traffic from 8080 port
gor --input-raw :8080 --output-http staging.com
# Capture traffic from 8080 port
gor --input-raw :8080 --output-http staging.com
-input-tcp=[]: Used for internal communication between Gor instances. Example:
# Receive requests from other Gor instances on 28020 port, and redirect output to staging
gor --input-tcp :28020 --output-http staging.com
# Receive requests from other Gor instances on 28020 port, and redirect output to staging
gor --input-tcp :28020 --output-http staging.com
-memprofile="": write memory profile to this file
-output-dummy=[]: Used for testing inputs. Just prints data coming from inputs.
-output-file=[]: Write incoming requests to file:
gor --input-raw :80 --output-file ./requests.gor
gor --input-raw :80 --output-file ./requests.gor
-output-http=[]: Forwards incoming requests to given http address.
# Redirect all incoming requests to staging.com address
gor --input-raw :80 --output-http http://staging.com
# Redirect all incoming requests to staging.com address
gor --input-raw :80 --output-http http://staging.com
-output-http-elasticsearch="": Send request and response stats to ElasticSearch:
gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'
gor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'
-output-http-header=[]: Inject additional headers to http reqest:
gor --input-raw :8080 --output-http staging.com --output-http-header 'User-Agent: Gor'
gor --input-raw :8080 --output-http staging.com --output-http-header 'User-Agent: Gor'
-output-http-header-filter=[]: A regexp to match a specific header against. Requests with non-matching headers will be dropped:
gor --input-raw :8080 --output-http staging.com --output-http-header-filter api-version:^v1
gor --input-raw :8080 --output-http staging.com --output-http-header-filter api-version:^v1
-output-http-header-hash-filter=[]: Takes a fraction of requests, consistently taking or rejecting a request based on the FNV32-1A hash of a specific header. The fraction must have a denominator that is a power of two:
gor --input-raw :8080 --output-http staging.com --output-http-header-hash-filter user-id:1/4
gor --input-raw :8080 --output-http staging.com --output-http-header-hash-filter user-id:1/4
-output-http-method=[]: Whitelist of HTTP methods to replay. Anything else will be dropped:
gor --input-raw :8080 --output-http staging.com --output-http-method GET --output-http-method OPTIONS
-output-http-redirects=0: Enable how often redirects should be followed.
-output-http-rewrite-url=[]: Rewrite the requst url based on a mapping:
gor --input-raw :8080 --output-http staging.com --output-http-rewrite-url /xml_test/interface.php:/api/service.do
-output-http-stats=false: Report http output queue stats to console every 5 seconds.
-output-http-url-regexp=: A regexp to match requests against. Anything else will be dropped:
gor --input-raw :8080 --output-http staging.com --output-http-url-regexp ^www.
-output-http-workers=-1: Number of http output workers desired. Use default -1 for dynamic worker scaling. Gor will add http workers if its work queue starts getting too full and kill them .
-output-http-stats=false: If set to `true` it gives out queuing stats for the HTTP output every 5 seconds in the form latest,mean,max,count,count/second.
gor --input-raw :8080 --output-http staging.com --output-http-url-regexp ^www.
-output-http-workers=-1: Gor uses dynamic worker scaling by default. Enter a number to run a set number of workers.
-output-tcp=[]: Used for internal communication between Gor instances. Example:
# Listen for requests on 80 port and forward them to other Gor instance on 28020 port
gor --input-raw :80 --output-tcp replay.local:28020
-output-tcp-stats=false: If set to `true` it gives out queuing stats for the TCP output every 5 seconds in the form latest,mean,max,count,count/second.
# Listen for requests on 80 port and forward them to other Gor instance on 28020 port
gor --input-raw :80 --output-tcp replay.local:28020
-output-tcp-stats=false: Report TCP output queue stats to console every 5 seconds.
-split-output=false: By default each output gets same traffic. If set to `true` it splits traffic equally among all outputs.
-output-http-rewrite-url=[]: Rewrites the url in the request based on a mapping
gor --input-raw :8080 --output-http staging.com --output-http-rewrite-url /xml_test/interface.php:/api/service.do
-stats=false: Turn on queue stats output
-verbose=false: Turn on verbose/debug output
```

## Building from source
Expand Down
11 changes: 6 additions & 5 deletions input_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,19 @@ func (i *TCPInput) handleConnection(conn net.Conn) {

for {
buf, err := reader.ReadBytes('¶')
if err == io.EOF {
return
} else if err != nil {
log.Println("Unexpected error in input tcp connection", err)
return
}
buf_len := len(buf)
if buf_len > 0 {
new_buf_len := len(buf) - 2
if new_buf_len > 0 {
new_buf := make([]byte, new_buf_len)
copy(new_buf, buf[:new_buf_len])
i.data <- new_buf
if err != nil {
if err != io.EOF {
log.Printf("error: %s\n", err)
}
}
}
}
}
Expand Down
30 changes: 21 additions & 9 deletions output_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,14 @@ import (
"bufio"
"bytes"
"io"
"io/ioutil"
"log"
"net/http"
"net/http/httputil"
"net/url"
"strings"
"sync/atomic"
"time"
"io/ioutil"
"net/http/httputil"
)

type RedirectNotAllowed struct{}
Expand All @@ -21,8 +21,8 @@ func (e *RedirectNotAllowed) Error() string {
}

// customCheckRedirect disables redirects https://github.com/buger/gor/pull/15
func customCheckRedirect(req *http.Request, via []*http.Request) error {
if len(via) >= 0 {
func (o *HTTPOutput) customCheckRedirect(req *http.Request, via []*http.Request) error {
if len(via) >= o.redirectLimit {
return new(RedirectNotAllowed)
}
return nil
Expand All @@ -33,14 +33,18 @@ func ParseRequest(data []byte) (request *http.Request, err error) {
var body []byte

// Test if request have Transfer-Encoding: chunked
isChunked := bytes.Contains(data, []byte(": chunked\r\n"));
isChunked := bytes.Contains(data, []byte(": chunked\r\n"))

buf := bytes.NewBuffer(data)
reader := bufio.NewReader(buf)

// ReadRequest does not read POST bodies, we have to do it by ourseves
request, err = http.ReadRequest(reader)

if err != nil {
return
}

if request.Method == "POST" {
// This works, because ReadRequest method modify buffer and strips all headers, leaving only body
if isChunked {
Expand All @@ -61,12 +65,18 @@ func ParseRequest(data []byte) (request *http.Request, err error) {
const InitialDynamicWorkers = 10

type HTTPOutput struct {
// Keep this as first element of struct because it guarantees 64bit
// alignment. atomic.* functions crash on 32bit machines if operand is not
// aligned at 64bit. See https://github.com/golang/go/issues/599
activeWorkers int64

address string
limit int
queue chan []byte

activeWorkers int64
needWorker chan int
redirectLimit int

needWorker chan int

urlRegexp HTTPUrlRegexp
headerFilters HTTPHeaderFilters
Expand All @@ -81,7 +91,7 @@ type HTTPOutput struct {
queueStats *GorStat
}

func NewHTTPOutput(address string, headers HTTPHeaders, methods HTTPMethods, urlRegexp HTTPUrlRegexp, headerFilters HTTPHeaderFilters, headerHashFilters HTTPHeaderHashFilters, elasticSearchAddr string, outputHTTPUrlRewrite UrlRewriteMap) io.Writer {
func NewHTTPOutput(address string, headers HTTPHeaders, methods HTTPMethods, urlRegexp HTTPUrlRegexp, headerFilters HTTPHeaderFilters, headerHashFilters HTTPHeaderHashFilters, elasticSearchAddr string, outputHTTPUrlRewrite UrlRewriteMap, outputHTTPRedirects int) io.Writer {

o := new(HTTPOutput)

Expand All @@ -93,6 +103,8 @@ func NewHTTPOutput(address string, headers HTTPHeaders, methods HTTPMethods, url
o.headers = headers
o.methods = methods

o.redirectLimit = Settings.outputHTTPRedirects

o.urlRegexp = urlRegexp
o.headerFilters = headerFilters
o.headerHashFilters = headerHashFilters
Expand Down Expand Up @@ -138,7 +150,7 @@ func (o *HTTPOutput) WorkerMaster() {

func (o *HTTPOutput) Worker() {
client := &http.Client{
CheckRedirect: customCheckRedirect,
CheckRedirect: o.customCheckRedirect,
}

death_count := 0
Expand Down
12 changes: 6 additions & 6 deletions output_http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package main

import (
"io"
"io/ioutil"
"net"
"net/http"
"net/http/httputil"
_ "strings"
"sync"
"testing"
"time"
"io/ioutil"
"net/http/httputil"
_ "strings"
)

func startHTTP(cb func(*http.Request)) net.Listener {
Expand Down Expand Up @@ -76,7 +76,7 @@ func TestHTTPOutput(t *testing.T) {
wg.Done()
})

output := NewHTTPOutput(listener.Addr().String(), headers, methods, HTTPUrlRegexp{}, HTTPHeaderFilters{}, HTTPHeaderHashFilters{}, "", UrlRewriteMap{})
output := NewHTTPOutput(listener.Addr().String(), headers, methods, HTTPUrlRegexp{}, HTTPHeaderFilters{}, HTTPHeaderHashFilters{}, "", UrlRewriteMap{}, 0)

Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestHTTPOutputChunkedEncoding(t *testing.T) {
wg.Done()
})

output := NewHTTPOutput(listener.Addr().String(), headers, methods, HTTPUrlRegexp{}, HTTPHeaderFilters{}, HTTPHeaderHashFilters{}, "", UrlRewriteMap{})
output := NewHTTPOutput(listener.Addr().String(), headers, methods, HTTPUrlRegexp{}, HTTPHeaderFilters{}, HTTPHeaderHashFilters{}, "", UrlRewriteMap{}, 0)

Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}
Expand Down Expand Up @@ -145,7 +145,7 @@ func BenchmarkHTTPOutput(b *testing.B) {
wg.Done()
})

output := NewHTTPOutput(listener.Addr().String(), headers, methods, HTTPUrlRegexp{}, HTTPHeaderFilters{}, HTTPHeaderHashFilters{}, "", UrlRewriteMap{})
output := NewHTTPOutput(listener.Addr().String(), headers, methods, HTTPUrlRegexp{}, HTTPHeaderFilters{}, HTTPHeaderHashFilters{}, "", UrlRewriteMap{}, 0)

Plugins.Inputs = []io.Reader{input}
Plugins.Outputs = []io.Writer{output}
Expand Down
14 changes: 12 additions & 2 deletions output_tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io"
"log"
"net"
"time"
)

type TCPOutput struct {
Expand Down Expand Up @@ -32,11 +33,20 @@ func NewTCPOutput(address string) io.Writer {
}

func (o *TCPOutput) worker() {
conn, _ := o.connect(o.address)
conn, err := o.connect(o.address)
for ; err != nil; conn, err = o.connect(o.address) {
time.Sleep(2 * time.Second)
}

defer conn.Close()

for {
conn.Write(<-o.buf)
_, err := conn.Write(<-o.buf)
if err != nil {
log.Println("Worker failed on write, exitings and starting new worker")
go o.worker()
break
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,6 @@ func InitPlugins() {
}

for _, options := range Settings.outputHTTP {
registerPlugin(NewHTTPOutput, options, Settings.outputHTTPHeaders, Settings.outputHTTPMethods, Settings.outputHTTPUrlRegexp, Settings.outputHTTPHeaderFilters, Settings.outputHTTPHeaderHashFilters, Settings.outputHTTPElasticSearch, Settings.outputHTTPUrlRewrite)
registerPlugin(NewHTTPOutput, options, Settings.outputHTTPHeaders, Settings.outputHTTPMethods, Settings.outputHTTPUrlRegexp, Settings.outputHTTPHeaderFilters, Settings.outputHTTPHeaderHashFilters, Settings.outputHTTPElasticSearch, Settings.outputHTTPUrlRewrite, Settings.outputHTTPRedirects)
}
}
3 changes: 2 additions & 1 deletion raw_socket_listener/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,13 @@ func (t *Listener) listen() {

func (t *Listener) readRAWSocket() {
conn, e := net.ListenPacket("ip4:tcp", t.addr)
defer conn.Close()

if e != nil {
log.Fatal(e)
}

defer conn.Close()

buf := make([]byte, 4096*2)

for {
Expand Down
2 changes: 2 additions & 0 deletions settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type AppSettings struct {
outputHTTPElasticSearch string
outputHTTPWorkers int
outputHTTPStats bool
outputHTTPRedirects int
}

var Settings AppSettings = AppSettings{}
Expand Down Expand Up @@ -83,6 +84,7 @@ func init() {

flag.StringVar(&Settings.outputHTTPElasticSearch, "output-http-elasticsearch", "", "Send request and response stats to ElasticSearch:\n\tgor --input-raw :8080 --output-http staging.com --output-http-elasticsearch 'es_host:api_port/index_name'")
flag.Var(&Settings.outputHTTPUrlRewrite, "output-http-rewrite-url", "Rewrite the requst url based on a mapping:\n\tgor --input-raw :8080 --output-http staging.com --output-http-rewrite-url /xml_test/interface.php:/api/service.do")
flag.IntVar(&Settings.outputHTTPRedirects, "output-http-redirects", 0, "Enable how often redirects should be followed.")
}

func Debug(args ...interface{}) {
Expand Down
6 changes: 2 additions & 4 deletions settings_header_hash_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,8 @@ func (h *HTTPHeaderHashFilters) Set(value string) error {
panic("need positive numerators and denominators, with the former less than the latter.")
}

for test := den; test != 1; test /= 2 {
if test%2 == 1 {
return errors.New("must have a denominator which is a power of two.")
}
if den&(den-1) != 0 {
return errors.New("must have a denominator which is a power of two.")
}

var f headerHashFilter
Expand Down
5 changes: 5 additions & 0 deletions settings_header_hash_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ func TestHTTPHeaderHashFilters(t *testing.T) {
t.Error("Should error on HeaderIrrelevant:1/3")
}

err = filters.Set("Pow2Denom:1/31")
if err == nil {
t.Error("Should error on Pow2Denom:1/31")
}

req := http.Request{}
req.Header = make(map[string][]string)
req.Header.Add("Header1", "test3414")
Expand Down

0 comments on commit 534e9b2

Please sign in to comment.