Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add orchestrator for supporting multiple processes #208

Closed
Closed
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
2c71b87
Add orchestrator for supporting multiple processes
dineshg13 Jun 25, 2023
bab5d8f
remove gops & use disover process
dineshg13 Jun 26, 2023
ad0ae97
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Jun 26, 2023
6e6181a
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Jun 27, 2023
24ccbd1
revert un-needed changes
dineshg13 Jun 28, 2023
b6386d4
Merge branch 'dinesh.gurumurthy/multiprocess' of github.com:dineshg13…
dineshg13 Jun 28, 2023
d196e4f
add changelog
dineshg13 Jun 28, 2023
729cf8d
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Jul 3, 2023
d5c54a1
PR comments - add controllersettings
dineshg13 Jul 3, 2023
d4c3931
fix lint issues
dineshg13 Jul 3, 2023
72f4fda
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Jul 8, 2023
cc9ef9a
add deployment yaml
dineshg13 Jul 8, 2023
4022e04
add chmod
dineshg13 Jul 10, 2023
05e27fe
Add tests for k8s deployment
damemi Jul 10, 2023
0bf47c0
read env vars from /proc
dineshg13 Jul 12, 2023
aed3934
remove ignore process + read service name from env var
dineshg13 Jul 13, 2023
8f06d0f
merge main
dineshg13 Jul 13, 2023
50b5c4e
Update generated offsets (#224)
github-actions[bot] Jul 15, 2023
1e2c979
Bump go.opentelemetry.io/build-tools/multimod in /internal/tools (#225)
dependabot[bot] Jul 16, 2023
f9f6572
Bump helm/kind-action from 1.7.0 to 1.8.0 (#226)
dependabot[bot] Jul 16, 2023
9874331
BPF FS: add support for multiple processes (#211)
edeNFed Jul 23, 2023
f5c4c06
Commit message
dineshg13 Jul 31, 2023
f79b635
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Jul 31, 2023
61b620d
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Aug 4, 2023
725e58b
merge main
dineshg13 Aug 8, 2023
7685fa1
merge main
dineshg13 Aug 8, 2023
a35693f
Address PR comments
dineshg13 Aug 16, 2023
db9989f
Merge main
dineshg13 Aug 21, 2023
4a42cc5
Update CHANGELOG.md
dineshg13 Aug 21, 2023
ed03b50
Update CHANGELOG.md
dineshg13 Aug 21, 2023
0340fdc
Update CHANGELOG.md
dineshg13 Sep 5, 2023
1cedbe6
merge main
dineshg13 Sep 5, 2023
a69ff54
non-compiling changes
dineshg13 Oct 4, 2023
5b1693e
local changes - non compiling
dineshg13 Oct 10, 2023
de6e29a
Add withPID
dineshg13 Oct 14, 2023
e3aaff3
remove un-needed tests
dineshg13 Oct 15, 2023
1183064
Merge main
dineshg13 Oct 15, 2023
ed52f93
remove gin from tests
dineshg13 Oct 15, 2023
39cc03a
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Oct 15, 2023
ef606ef
fix license
dineshg13 Oct 15, 2023
6137c90
Update changelog
dineshg13 Oct 15, 2023
89d30e4
fix version
dineshg13 Oct 15, 2023
fc13d3d
fix path
dineshg13 Oct 15, 2023
937cb8b
remove gorillamux
dineshg13 Oct 15, 2023
2931fdf
fix test
dineshg13 Oct 15, 2023
2f206d9
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Oct 15, 2023
09049d7
Add OTel go sql statement
dineshg13 Oct 15, 2023
6b1ae19
Merge branch 'main' into dinesh.gurumurthy/multiprocess
pdelewski Oct 17, 2023
67867d4
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Oct 19, 2023
e598e1d
Update cli/main.go
dineshg13 Oct 19, 2023
11cb8d1
Update Makefile
dineshg13 Oct 19, 2023
92dd99a
PR feedback - move context to Run method
dineshg13 Oct 19, 2023
1bf77a5
Merge remote-tracking branch 'origin/main' into dinesh.gurumurthy/mul…
dineshg13 Oct 19, 2023
e778cfb
PR feedback - move changelog to un-released
dineshg13 Oct 19, 2023
4bcce14
Merge main
dineshg13 Oct 22, 2023
fa996bd
PR feedback - comments + remove monitor All
dineshg13 Oct 22, 2023
74ecf77
PR feedback - change name + add ticker stop method
dineshg13 Oct 22, 2023
c6ccf5a
Fix tests
dineshg13 Oct 22, 2023
715c21d
Merge branch 'main' into dinesh.gurumurthy/multiprocess
dineshg13 Oct 22, 2023
26748f8
Merge main + PR feedback on ctx
dineshg13 Oct 23, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ OpenTelemetry Go Automatic Instrumentation adheres to [Semantic Versioning](http

- Add net/http client instrumentor. ([#91](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/91))
- Add context propagation to net/http server instrumentation. ([#92](https://github.com/open-telemetry/opentelemetry-go-instrumentation/pull/92))
- Add ability to monitor multiple processes. ([#197](https://github.com/open-telemetry/opentelemetry-go-instrumentation/issues/197))
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved

### Changed

Expand Down
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ fixtures/%:
git clone https://github.com/open-telemetry/opentelemetry-helm-charts.git; \
fi
helm install test -f .github/workflows/e2e/k8s/collector-helm-values.yml opentelemetry-helm-charts/charts/opentelemetry-collector
sleep 5
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
kubectl wait --for=condition=Ready --timeout=60s pod/test-opentelemetry-collector-0
kubectl -n default create -f .github/workflows/e2e/k8s/sample-job.yml
kubectl wait --for=condition=Complete --timeout=60s job/sample-job
Expand Down
106 changes: 61 additions & 45 deletions cli/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,41 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
"runtime"
"strings"
"syscall"

"go.opentelemetry.io/auto/pkg/errors"
"go.opentelemetry.io/auto/pkg/instrumentors"
"google.golang.org/grpc"

"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"

"go.opentelemetry.io/auto"
"go.opentelemetry.io/auto/pkg/log"
"go.opentelemetry.io/auto/pkg/opentelemetry"
"go.opentelemetry.io/auto/pkg/orchestrator"
"go.opentelemetry.io/auto/pkg/process"
)

var (
// Controller-local reference to the auto-instrumentation release version.
releaseVersion = auto.Version()
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
// Start of this auto-instrumentation's exporter User-Agent header, e.g. ""OTel-Go-Auto-Instrumentation/1.2.3".
baseUserAgent = fmt.Sprintf("OTel-Go-Auto-Instrumentation/%s", releaseVersion)
// Information about the runtime environment for inclusion in User-Agent, e.g. "go/1.18.2 (linux/amd64)".
runtimeInfo = fmt.Sprintf(
"%s (%s/%s)",
strings.Replace(runtime.Version(), "go", "go/", 1),
runtime.GOOS,
runtime.GOARCH,
)
// Combined User-Agent identifying this auto-instrumentation and its runtime environment, see RFC7231 for format considerations.
autoinstUserAgent = fmt.Sprintf("%s %s", baseUserAgent, runtimeInfo)
)

func main() {
err := log.Init()
if err != nil {
Expand All @@ -35,56 +58,49 @@ func main() {
}

log.Logger.V(0).Info("starting Go OpenTelemetry Agent ...")
target := process.ParseTargetArgs()
if err = target.Validate(); err != nil {
log.Logger.Error(err, "invalid target args")
return
}

processAnalyzer := process.NewAnalyzer()
otelController, err := opentelemetry.NewController()
ctx := contextWithSigterm(context.Background())
log.Logger.V(0).Info("Establishing connection to OTLP receiver ...")
otlpTraceClient := otlptracegrpc.NewClient(
otlptracegrpc.WithDialOption(grpc.WithUserAgent(autoinstUserAgent)),
)
traceExporter, err := otlptrace.New(ctx, otlpTraceClient)
if err != nil {
log.Logger.Error(err, "unable to create OpenTelemetry controller")
log.Logger.Error(err, "unable to connect to OTLP endpoint")
return
}

instManager, err := instrumentors.NewManager(otelController)
targetArgs := process.ParseTargetArgs()
if targetArgs != nil {
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
if err := targetArgs.Validate(); err != nil {
log.Logger.Error(err, "invalid target args")
return
}
}
r, err := orchestrator.New(ctx, targetArgs, traceExporter)
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Logger.Error(err, "error creating instrumetors manager")
return
log.Logger.V(0).Error(err, "creating orchestrator")
}
if err = r.Run(); err != nil {
log.Logger.Error(err, "running orchestrator")
}
}

stopper := make(chan os.Signal, 1)
signal.Notify(stopper, os.Interrupt, syscall.SIGTERM)
go func() {
<-stopper
log.Logger.V(0).Info("Got SIGTERM, cleaning up..")
processAnalyzer.Close()
instManager.Close()
}()
func contextWithSigterm(parent context.Context) context.Context {
ctx, cancel := context.WithCancel(parent)

pid, err := processAnalyzer.DiscoverProcessID(target)
if err != nil {
if err != errors.ErrInterrupted {
log.Logger.Error(err, "error while discovering process id")
}
return
}
ch := make(chan os.Signal, 1)
signal.Notify(ch, os.Interrupt, syscall.SIGTERM)

targetDetails, err := processAnalyzer.Analyze(pid, instManager.GetRelevantFuncs())
if err != nil {
log.Logger.Error(err, "error while analyzing target process")
return
}
log.Logger.V(0).Info("target process analysis completed", "pid", targetDetails.PID,
"go_version", targetDetails.GoVersion, "dependencies", targetDetails.Libraries,
"total_functions_found", len(targetDetails.Functions))
go func() {
defer close(ch)
defer signal.Stop(ch)

instManager.FilterUnusedInstrumentors(targetDetails)
select {
case <-parent.Done(): // if parent is cancelled, return
return
case <-ch: // if SIGTERM is received, cancel this context
cancel()
}
}()

log.Logger.V(0).Info("invoking instrumentors")
err = instManager.Run(targetDetails)
if err != nil && err != errors.ErrInterrupted {
log.Logger.Error(err, "error while running instrumentors")
}
return ctx
}
8 changes: 6 additions & 2 deletions pkg/opentelemetry/controller_test.go → cli/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package opentelemetry
package main

import (
"fmt"
Expand All @@ -24,5 +24,9 @@ import (
)

func TestUserAgent(t *testing.T) {
assert.Contains(t, autoinstUserAgent, fmt.Sprintf("OTel-Go-Auto-Instrumentation/%s", auto.Version()))
assert.Contains(
t,
autoinstUserAgent,
fmt.Sprintf("OTel-Go-Auto-Instrumentation/%s", auto.Version()),
)
}
25 changes: 16 additions & 9 deletions pkg/instrumentors/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,24 @@ import (
"go.opentelemetry.io/auto/pkg/process"
)

var (
// Error message returned when unable to find all instrumentation functions.
errNotAllFuncsFound = fmt.Errorf("not all functions found for instrumentation")
)
// Error message returned when unable to find all instrumentation functions.
var errNotAllFuncsFound = fmt.Errorf("not all functions found for instrumentation")

// Manager handles the management of [Instrumentor] instances.
type Manager struct {
instrumentors map[string]Instrumentor
done chan bool
incomingEvents chan *events.Event
otelController *opentelemetry.Controller
allocator *allocator.Allocator
otelController *opentelemetry.Controller
}

// NewManager returns a new [Manager].
func NewManager(otelController *opentelemetry.Controller) (*Manager, error) {
m := &Manager{
instrumentors: make(map[string]Instrumentor),
done: make(chan bool, 1),
incomingEvents: make(chan *events.Event),
incomingEvents: make(chan *events.Event, 10),
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
otelController: otelController,
allocator: allocator.New(),
}
Expand Down Expand Up @@ -84,9 +82,9 @@ func (m *Manager) GetRelevantFuncs() map[string]interface{} {
return funcsMap
}

// FilterUnusedInstrumentors filterers Instrumentors whose functions are
// filterUnusedInstrumentors filterers Instrumentors whose functions are
// already instrumented out of the Manager.
func (m *Manager) FilterUnusedInstrumentors(target *process.TargetDetails) {
func (m *Manager) filterUnusedInstrumentors(target *process.TargetDetails) {
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
existingFuncMap := make(map[string]interface{})
for _, f := range target.Functions {
existingFuncMap[f.Name] = nil
Expand All @@ -102,7 +100,16 @@ func (m *Manager) FilterUnusedInstrumentors(target *process.TargetDetails) {

if funcsFound != len(inst.FuncNames()) {
if funcsFound > 0 {
log.Logger.Error(errNotAllFuncsFound, "some of expected functions not found - check instrumented functions", "instrumentation_name", name, "funcs_found", funcsFound, "funcs_expected", len(inst.FuncNames()))
log.Logger.Error(
errNotAllFuncsFound,
"some of expected functions not found - check instrumented functions",
"instrumentation_name",
name,
"funcs_found",
funcsFound,
"funcs_expected",
len(inst.FuncNames()),
)
}
delete(m.instrumentors, name)
}
Expand Down
8 changes: 5 additions & 3 deletions pkg/instrumentors/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,18 @@ import (

// Run runs the event processing loop for all managed Instrumentors.
func (m *Manager) Run(target *process.TargetDetails) error {
m.filterUnusedInstrumentors(target)

if len(m.instrumentors) == 0 {
log.Logger.V(0).Info("there are no available instrumentations for target process")
return nil
}
log.Logger.V(0).Info("running instrumentors", "number", fmt.Sprintf("%d", len(m.instrumentors)))

err := m.load(target)
if err != nil {
return err
}

for _, i := range m.instrumentors {
go i.Run(m.incomingEvents)
}
Expand Down Expand Up @@ -91,12 +93,12 @@ func (m *Manager) load(target *process.TargetDetails) error {
}
}

log.Logger.V(0).Info("loaded instrumentors to memory", "total_instrumentors", len(m.instrumentors))
log.Logger.V(0).
Info("loaded instrumentors to memory", "total_instrumentors", len(m.instrumentors))
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

func (m *Manager) cleanup() {
close(m.incomingEvents)
MrAlias marked this conversation as resolved.
Show resolved Hide resolved
for _, i := range m.instrumentors {
i.Close()
}
Expand Down
53 changes: 10 additions & 43 deletions pkg/opentelemetry/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,39 +16,19 @@ package opentelemetry

import (
"context"
"fmt"
"os"
"runtime"
"strings"
"time"

"golang.org/x/sys/unix"
"google.golang.org/grpc"

"go.opentelemetry.io/auto"
"go.opentelemetry.io/auto/pkg/instrumentors/events"
"go.opentelemetry.io/auto/pkg/log"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.18.0"
"go.opentelemetry.io/otel/trace"
)

const (
otelServiceNameEnvVar = "OTEL_SERVICE_NAME"
)

var (
// Controller-local reference to the auto-instrumentation release version.
releaseVersion = auto.Version()
// Start of this auto-instrumentation's exporter User-Agent header, e.g. ""OTel-Go-Auto-Instrumentation/1.2.3".
baseUserAgent = fmt.Sprintf("OTel-Go-Auto-Instrumentation/%s", releaseVersion)
// Information about the runtime environment for inclusion in User-Agent, e.g. "go/1.18.2 (linux/amd64)".
runtimeInfo = fmt.Sprintf("%s (%s/%s)", strings.Replace(runtime.Version(), "go", "go/", 1), runtime.GOOS, runtime.GOARCH)
// Combined User-Agent identifying this auto-instrumentation and its runtime environment, see RFC7231 for format considerations.
autoinstUserAgent = fmt.Sprintf("%s %s", baseUserAgent, runtimeInfo)
"go.opentelemetry.io/auto"
"go.opentelemetry.io/auto/pkg/instrumentors/events"
"go.opentelemetry.io/auto/pkg/log"
)

// Controller handles OpenTelemetry telemetry generation for events.
Expand Down Expand Up @@ -76,7 +56,6 @@ func (c *Controller) Trace(event *events.Event) {

if event.SpanContext == nil {
log.Logger.V(0).Info("got event without context - dropping")
return
}

// TODO: handle remote parent
Expand All @@ -98,35 +77,23 @@ func (c *Controller) convertTime(t int64) time.Time {
}

// NewController returns a new initialized [Controller].
func NewController() (*Controller, error) {
serviceName, exists := os.LookupEnv(otelServiceNameEnvVar)
if !exists {
return nil, fmt.Errorf("%s env var must be set", otelServiceNameEnvVar)
}

ctx := context.Background()
func NewController(
ctx context.Context,
serviceName string,
exporter sdktrace.SpanExporter,
dineshg13 marked this conversation as resolved.
Show resolved Hide resolved
) (*Controller, error) {
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
semconv.TelemetrySDKLanguageGo,
semconv.TelemetryAutoVersionKey.String(releaseVersion),
semconv.TelemetryAutoVersionKey.String(auto.Version()),
),
)
if err != nil {
return nil, err
}

log.Logger.V(0).Info("Establishing connection to OTLP receiver ...")
otlpTraceClient := otlptracegrpc.NewClient(
otlptracegrpc.WithDialOption(grpc.WithUserAgent(autoinstUserAgent)),
)
traceExporter, err := otlptrace.New(ctx, otlpTraceClient)
if err != nil {
log.Logger.Error(err, "unable to connect to OTLP endpoint")
return nil, err
}

bsp := sdktrace.NewBatchSpanProcessor(traceExporter)
bsp := sdktrace.NewBatchSpanProcessor(exporter)
tracerProvider := sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithResource(res),
Expand Down
Loading