Skip to content

Commit

Permalink
Python agent opamp (#1346)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamir David <tamirdavid@Tamirs-MacBook-Pro.local>
  • Loading branch information
tamirdavid1 and Tamir David authored Jul 15, 2024
1 parent 1991f1f commit 0beff5f
Show file tree
Hide file tree
Showing 11 changed files with 505 additions and 126 deletions.
48 changes: 37 additions & 11 deletions agents/python/configurator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
# my_otel_configurator/__init__.py
import opentelemetry.sdk._configuration as sdk_config
import threading
import atexit
import os
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.resources import ProcessResourceDetector, OTELResourceDetector
from .version import VERSION

from opamp.http_client import OpAMPHTTPClient

class OdigosPythonConfigurator(sdk_config._BaseConfigurator):
def _configure(self, **kwargs):
_initialize_components(kwargs.get("auto_instrumentation_version"))
_initialize_components()

def _initialize_components(auto_instrumentation_version):
def _initialize_components():
trace_exporters, metric_exporters, log_exporters = sdk_config._import_exporters(
sdk_config._get_exporter_names("traces"),
sdk_config._get_exporter_names("metrics"),
Expand All @@ -21,14 +23,24 @@ def _initialize_components(auto_instrumentation_version):
"telemetry.distro.name": "odigos",
"telemetry.distro.version": VERSION,
}

resource_attributes_event = threading.Event()
client = start_opamp_client(resource_attributes_event)
resource_attributes_event.wait(timeout=30) # Wait for the resource attributes to be received for 30 seconds

received_value = client.resource_attributes

if received_value:
auto_resource.update(received_value)

resource = Resource.create(auto_resource) \
.merge(OTELResourceDetector().detect()) \
.merge(ProcessResourceDetector().detect())
resource = Resource.create(auto_resource) \
.merge(OTELResourceDetector().detect()) \
.merge(ProcessResourceDetector().detect())

initialize_traces_if_enabled(trace_exporters, resource)
initialize_metrics_if_enabled(metric_exporters, resource)
initialize_logging_if_enabled(log_exporters, resource)

initialize_traces_if_enabled(trace_exporters, resource)
initialize_metrics_if_enabled(metric_exporters, resource)
initialize_logging_if_enabled(log_exporters, resource)

def initialize_traces_if_enabled(trace_exporters, resource):
traces_enabled = os.getenv(sdk_config.OTEL_TRACES_EXPORTER, "none").strip().lower()
Expand All @@ -40,9 +52,23 @@ def initialize_traces_if_enabled(trace_exporters, resource):
def initialize_metrics_if_enabled(metric_exporters, resource):
metrics_enabled = os.getenv(sdk_config.OTEL_METRICS_EXPORTER, "none").strip().lower()
if metrics_enabled != "none":
sdk_config._init_metrics(metric_exporters,resource)
sdk_config._init_metrics(metric_exporters, resource)

def initialize_logging_if_enabled(log_exporters, resource):
logging_enabled = os.getenv(sdk_config.OTEL_LOGS_EXPORTER, "none").strip().lower()
if logging_enabled != "none":
sdk_config._init_logging(log_exporters, resource)
sdk_config._init_logging(log_exporters, resource)


def start_opamp_client(event):
condition = threading.Condition(threading.Lock())
client = OpAMPHTTPClient(event, condition)
client.start()

def shutdown():
client.shutdown()

# Ensure that the shutdown function is called on program exit
atexit.register(shutdown)

return client
Empty file added agents/python/opamp/__init__.py
Empty file.
33 changes: 33 additions & 0 deletions agents/python/opamp/anyvalue_pb2.py

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

165 changes: 165 additions & 0 deletions agents/python/opamp/http_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
import os
import time
import threading
import requests
import logging

from uuid_extensions import uuid7
from opentelemetry.semconv.resource import ResourceAttributes
from opentelemetry.context import (
_SUPPRESS_HTTP_INSTRUMENTATION_KEY,
attach,
detach,
set_value,
)

from opamp import opamp_pb2, anyvalue_pb2, utils


# Setup the logger
opamp_logger = logging.getLogger(__name__)
opamp_logger.setLevel(logging.DEBUG)
opamp_logger.disabled = True # Comment this line to enable the logger


class OpAMPHTTPClient:
def __init__(self, event, condition: threading.Condition):
self.server_host = os.getenv('ODIGOS_OPAMP_SERVER_HOST')
self.instrumentation_device_id = os.getenv('ODIGOS_INSTRUMENTATION_DEVICE_ID')
self.server_url = f"http://{self.server_host}/v1/opamp"
self.resource_attributes = {}
self.running = True
self.condition = condition
self.event = event
self.next_sequence_num = 0
self.instance_uid = uuid7().__str__()

def start(self):
self.client_thread = threading.Thread(target=self.run, name="OpAMPClientThread", daemon=True)
self.client_thread.start()

def run(self):
if not self.mandatory_env_vars_set():
self.event.set()
return

self.send_first_message_with_retry()

self.event.set()

self.worker()

def send_first_message_with_retry(self) -> None:
max_retries = 5
delay = 2
for attempt in range(1, max_retries + 1):
first_message_server_to_agent = self.send_full_state()
try:
self.resource_attributes = utils.parse_first_message_to_resource_attributes(first_message_server_to_agent, opamp_logger)
break
except Exception as e:
opamp_logger.error(f"Error sending full state to OpAMP server: {e}")

if attempt < max_retries:
time.sleep(delay)

def worker(self):
while self.running:
with self.condition:
try:
server_to_agent = self.send_heartbeat()
if server_to_agent.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
opamp_logger.info("Received request to report full state")
self.send_full_state()

except requests.RequestException as e:
opamp_logger.error(f"Error fetching data: {e}")
self.condition.wait(30)

def send_heartbeat(self):
opamp_logger.debug("Sending heartbeat to OpAMP server...")
try:
return self.send_agent_to_server_message(opamp_pb2.AgentToServer())
except requests.RequestException as e:
opamp_logger.error(f"Error sending heartbeat to OpAMP server: {e}")

def send_full_state(self):
opamp_logger.info("Sending full state to OpAMP server...")

identifying_attributes = [
anyvalue_pb2.KeyValue(
key=ResourceAttributes.SERVICE_INSTANCE_ID,
value=anyvalue_pb2.AnyValue(string_value=self.instance_uid)
),
anyvalue_pb2.KeyValue(
key=ResourceAttributes.PROCESS_PID,
value=anyvalue_pb2.AnyValue(int_value=os.getpid())
),
anyvalue_pb2.KeyValue(
key=ResourceAttributes.TELEMETRY_SDK_LANGUAGE,
value=anyvalue_pb2.AnyValue(string_value="python")
)
]

agent_description = opamp_pb2.AgentDescription(
identifying_attributes=identifying_attributes,
non_identifying_attributes=[]
)
return self.send_agent_to_server_message(opamp_pb2.AgentToServer(agent_description=agent_description))

def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent:
message.instance_uid = self.instance_uid.encode('utf-8')
message.sequence_num = self.next_sequence_num
self.next_sequence_num += 1
message_bytes = message.SerializeToString()

headers = {
"Content-Type": "application/x-protobuf",
"X-Odigos-DeviceId": self.instrumentation_device_id
}

try:
agent_message = attach(set_value(_SUPPRESS_HTTP_INSTRUMENTATION_KEY, True))
response = requests.post(self.server_url, data=message_bytes, headers=headers, timeout=5)
response.raise_for_status()
except requests.Timeout:
opamp_logger.error("Timeout sending message to OpAMP server")
return opamp_pb2.ServerToAgent()
except requests.ConnectionError as e:
opamp_logger.error(f"Error sending message to OpAMP server: {e}")
return opamp_pb2.ServerToAgent()
finally:
detach(agent_message)

server_to_agent = opamp_pb2.ServerToAgent()
try:
server_to_agent.ParseFromString(response.content)
except NotImplementedError as e:
opamp_logger.error(f"Error parsing response from OpAMP server: {e}")
return opamp_pb2.ServerToAgent()
return server_to_agent

def mandatory_env_vars_set(self):
mandatory_env_vars = {
"ODIGOS_OPAMP_SERVER_HOST": self.server_host,
"ODIGOS_INSTRUMENTATION_DEVICE_ID": self.instrumentation_device_id
}

for env_var, value in mandatory_env_vars.items():
if not value:
opamp_logger.error(f"{env_var} environment variable not set")
return False

return True

def shutdown(self):
self.running = False

opamp_logger.info("Sending agent disconnect message to OpAMP server...")
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect())

with self.condition:
self.condition.notify_all()
self.client_thread.join()

self.send_agent_to_server_message(disconnect_message)
Loading

0 comments on commit 0beff5f

Please sign in to comment.