Skip to content

Commit

Permalink
Migrate jaeger-agent implementation from internal repository (#61)
Browse files Browse the repository at this point in the history
  • Loading branch information
yurishkuro authored Mar 19, 2017
1 parent 32429cb commit 1465fd5
Show file tree
Hide file tree
Showing 43 changed files with 4,501 additions and 61 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ _site/
env/
Gemfile.lock
vendor/
examples/hotrod/hotrod
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fmt:
lint:
$(GOVET) $(PACKAGES)
@cat /dev/null > $(LINT_LOG)
@$(foreach pkg, $(PACKAGES), $(GOLINT) $(pkg) | grep -v thrift-gen >> $(LINT_LOG) || true;)
@$(foreach pkg, $(PACKAGES), $(GOLINT) $(pkg) | grep -v -e thrift-gen -e thrift-0.9.2 >> $(LINT_LOG) || true;)
@[ ! -s "$(LINT_LOG)" ] || (echo "Lint Failures" | cat - $(LINT_LOG) && false)
@$(GOFMT) -e -s -l $(ALL_SRC) > $(FMT_LOG)
@./scripts/updateLicenses.sh >> $(FMT_LOG)
Expand Down
35 changes: 35 additions & 0 deletions cmd/agent/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Jaeger Agent

`jaeger-agent` is a daemon program that runs on every host and receives
tracing information submitted by applications via Jaeger client
libraries.

## Structure

* Agent
* processor as ThriftProcessor
* server as TBufferedServer
* Thrift UDP Transport
* reporter as TCollectorReporter
* sampling server
* sampling manager as sampling.TCollectorProxy

### UDP Server

Listens on UDP transport, reads data as `[]byte` and forwards to
`processor` over channel. Processor has N workers that read from
the channel, convert to thrift-generated object model, and pass on
to the Reporter. `TCollectorReporter` submits the spans to remote
`tcollector` service.

### Sampling Server

An HTTP server handling request in the form

http://localhost:port/sampling?service=xxxx`

Delegates to `sampling.Manager` to get the sampling strategy.
`sampling.TCollectorProxy` implements `sampling.Manager` by querying
remote `tcollector` service. Then the server converts
thrift response from sampling manager into JSON and responds to clients.

83 changes: 83 additions & 0 deletions cmd/agent/app/agent.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package app

import (
"io"
"net"
"net/http"

"github.com/uber-go/zap"

"github.com/uber/jaeger/cmd/agent/app/processors"
)

// Agent is a composition of all services / components
type Agent struct {
processors []processors.Processor
samplingServer *http.Server
discoveryClient interface{}
logger zap.Logger
closer io.Closer
}

// NewAgent creates the new Agent.
func NewAgent(
processors []processors.Processor,
samplingServer *http.Server,
discoveryClient interface{},
logger zap.Logger,
) *Agent {
return &Agent{
processors: processors,
samplingServer: samplingServer,
discoveryClient: discoveryClient,
logger: logger,
}
}

// Run runs all of agent UDP and HTTP servers in separate go-routines.
// It returns an error when it's immediately apparent on startup, but
// any errors happening after starting the servers are only logged.
func (a *Agent) Run() error {
listener, err := net.Listen("tcp", a.samplingServer.Addr)
if err != nil {
return err
}
a.closer = listener
go func() {
if err := a.samplingServer.Serve(listener); err != nil {
a.logger.Error("sampling server failure", zap.Error(err))
}
}()
for _, processor := range a.processors {
go processor.Serve()
}
return nil
}

// Stop forces all agent go routines to exit.
func (a *Agent) Stop() {
for _, processor := range a.processors {
go processor.Stop()
}
a.closer.Close()
}
93 changes: 93 additions & 0 deletions cmd/agent/app/agent_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
// Copyright (c) 2017 Uber Technologies, Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
// THE SOFTWARE.

package app

import (
"fmt"
"io/ioutil"
"net/http"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber-go/zap"
"github.com/uber/jaeger-lib/metrics"
)

func TestAgentStartError(t *testing.T) {
cfg := &Builder{}
agent, err := cfg.CreateAgent(metrics.NullFactory, zap.New(zap.NullEncoder()))
require.NoError(t, err)
agent.samplingServer.Addr = "bad-address"
assert.Error(t, agent.Run())
}

func TestAgentStartStop(t *testing.T) {
cfg := Builder{
Processors: []ProcessorConfiguration{
{
Model: jaegerModel,
Protocol: compactProtocol,
Server: ServerConfiguration{
HostPort: "127.0.0.1:0",
},
},
},
}
agent, err := cfg.CreateAgent(metrics.NullFactory, zap.New(zap.NullEncoder()))
require.NoError(t, err)
ch := make(chan error, 2)
go func() {
if err := agent.Run(); err != nil {
t.Errorf("error from agent.Run(): %s", err)
ch <- err
}
close(ch)
}()

url := fmt.Sprintf("http://%s/sampling?service=abc", agent.samplingServer.Addr)
httpClient := &http.Client{
Timeout: 100 * time.Millisecond,
}
for i := 0; i < 1000; i++ {
_, err := httpClient.Get(url)
if err == nil {
break
}
select {
case err := <-ch:
if err != nil {
t.Fatalf("error from agent: %s", err)
}
break
default:
time.Sleep(time.Millisecond)
}
}
resp, err := http.Get(url)
require.NoError(t, err)
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, "tcollector error: no peers available\n", string(body))
agent.Stop()
assert.NoError(t, <-ch)
}
Loading

0 comments on commit 1465fd5

Please sign in to comment.