Skip to content

Commit

Permalink
Report opamp health (#1432)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamir David <tamirdavid@Tamirs-MacBook-Pro.local>
  • Loading branch information
tamirdavid1 and Tamir David authored Aug 7, 2024
1 parent a45fa43 commit 0a02c79
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 25 deletions.
14 changes: 13 additions & 1 deletion agents/python/configurator/__init__.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import threading
import atexit
import sys
import os
import opentelemetry.sdk._configuration as sdk_config

from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.resources import ProcessResourceDetector, OTELResourceDetector
from .lib_handling import reorder_python_path, reload_distro_modules
from .version import VERSION
from opamp.http_client import OpAMPHTTPClient


MINIMUM_PYTHON_SUPPORTED_VERSION = (3, 8)

class OdigosPythonConfigurator(sdk_config._BaseConfigurator):

def _configure(self, **kwargs):
Expand Down Expand Up @@ -71,7 +76,10 @@ def initialize_logging_if_enabled(log_exporters, resource):
def start_opamp_client(event):
condition = threading.Condition(threading.Lock())
client = OpAMPHTTPClient(event, condition)
client.start()

python_version_supported = is_supported_python_version()

client.start(python_version_supported)

def shutdown():
client.shutdown()
Expand All @@ -80,3 +88,7 @@ def shutdown():
atexit.register(shutdown)

return client


def is_supported_python_version():
return sys.version_info >= MINIMUM_PYTHON_SUPPORTED_VERSION
7 changes: 7 additions & 0 deletions agents/python/opamp/health_status.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from enum import Enum

class AgentHealthStatus(str, Enum):
HEALTHY = "Healthy"
STARTING = "Starting"
UNSUPPORTED_RUNTIME_VERSION = "UnsupportedRuntimeVersion"
TERMINATED = "ProcessTerminated"
78 changes: 67 additions & 11 deletions agents/python/opamp/http_client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import sys
import time
import threading
import requests
Expand All @@ -14,7 +15,7 @@
)

from opamp import opamp_pb2, anyvalue_pb2, utils

from opamp.health_status import AgentHealthStatus

# Setup the logger
opamp_logger = logging.getLogger(__name__)
Expand All @@ -35,7 +36,18 @@ def __init__(self, event, condition: threading.Condition):
self.instance_uid = uuid7().__str__()
self.remote_config_status = None

def start(self):

def start(self, python_version_supported: bool = None):
if not python_version_supported:

python_version = f'{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}'
error_message = f"Opentelemetry SDK require Python in version 3.8 or higher [{python_version} is not supported]"

opamp_logger.warning(f"{error_message}, sending disconnect message to OpAMP server...")
self.send_unsupported_version_disconnect_message(error_message=error_message)
self.event.set()
return

self.client_thread = threading.Thread(target=self.run, name="OpAMPClientThread", daemon=True)
self.client_thread.start()

Expand All @@ -50,14 +62,39 @@ def run(self):

self.worker()

def send_unsupported_version_disconnect_message(self, error_message: str) -> None:
first_disconnect_message = opamp_pb2.AgentToServer()

agent_description = self.get_agent_description()

first_disconnect_message.agent_description.CopyFrom(agent_description)

agent_disconnect = self.get_agent_disconnect()
first_disconnect_message.agent_disconnect.CopyFrom(agent_disconnect)

agent_health = self.get_agent_health(component_health=False, last_error=error_message, status=AgentHealthStatus.UNSUPPORTED_RUNTIME_VERSION.value)
first_disconnect_message.health.CopyFrom(agent_health)

self.send_agent_to_server_message(first_disconnect_message)

def send_first_message_with_retry(self) -> None:
max_retries = 5
delay = 2
for attempt in range(1, max_retries + 1):
try:
first_message_server_to_agent = self.send_full_state()
# Send first message to OpAMP server, Health is false as the component is not initialized
agent_health = self.get_agent_health(component_health=False, last_error="Python OpenTelemetry agent is starting", status=AgentHealthStatus.STARTING.value)
agent_description = self.get_agent_description()
first_message_server_to_agent = self.send_agent_to_server_message(opamp_pb2.AgentToServer(agent_description=agent_description, health=agent_health))

self.update_remote_config_status(first_message_server_to_agent)
self.resource_attributes = utils.parse_first_message_to_resource_attributes(first_message_server_to_agent, opamp_logger)

# Send healthy message to OpAMP server
opamp_logger.info("Reporting healthy to OpAMP server...")
agent_health = self.get_agent_health(component_health=True, status=AgentHealthStatus.HEALTHY.value)
self.send_agent_to_server_message(opamp_pb2.AgentToServer(health=agent_health))

break
except Exception as e:
opamp_logger.error(f"Error sending full state to OpAMP server: {e}")
Expand All @@ -76,7 +113,13 @@ def worker(self):

if server_to_agent.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
opamp_logger.info("Received request to report full state")
server_to_agent = self.send_full_state()

agent_description = self.get_agent_description()
agent_health = self.get_agent_health(component_health=True, status=AgentHealthStatus.HEALTHY.value)
agent_to_server = opamp_pb2.AgentToServer(agent_description=agent_description, health=agent_health)

server_to_agent = self.send_agent_to_server_message(agent_to_server)

self.update_remote_config_status(server_to_agent)

except requests.RequestException as e:
Expand All @@ -91,9 +134,7 @@ def send_heartbeat(self) -> opamp_pb2.ServerToAgent:
except requests.RequestException as e:
opamp_logger.error(f"Error sending heartbeat to OpAMP server: {e}")

def send_full_state(self):
opamp_logger.info("Sending full state to OpAMP server...")

def get_agent_description(self) -> opamp_pb2.AgentDescription:
identifying_attributes = [
anyvalue_pb2.KeyValue(
key=ResourceAttributes.SERVICE_INSTANCE_ID,
Expand All @@ -109,11 +150,26 @@ def send_full_state(self):
)
]

agent_description = opamp_pb2.AgentDescription(
return opamp_pb2.AgentDescription(
identifying_attributes=identifying_attributes,
non_identifying_attributes=[]
)
return self.send_agent_to_server_message(opamp_pb2.AgentToServer(agent_description=agent_description))

def get_agent_disconnect(self) -> opamp_pb2.AgentDisconnect:
return opamp_pb2.AgentDisconnect()

def get_agent_health(self, component_health: bool = None, last_error : str = None, status: str = None) -> opamp_pb2.ComponentHealth:
health = opamp_pb2.ComponentHealth(
)
if component_health is not None:
health.healthy = component_health
if last_error is not None:
health.last_error = last_error
if status is not None:
health.status = status

return health


def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent:

Expand Down Expand Up @@ -166,9 +222,9 @@ def mandatory_env_vars_set(self):

def shutdown(self):
self.running = False

opamp_logger.info("Sending agent disconnect message to OpAMP server...")
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect())
agent_health = self.get_agent_health(component_health=False, last_error="Python runtime is exiting", status=AgentHealthStatus.TERMINATED.value)
disconnect_message = opamp_pb2.AgentToServer(agent_disconnect=opamp_pb2.AgentDisconnect(), health=agent_health)

with self.condition:
self.condition.notify_all()
Expand Down
2 changes: 1 addition & 1 deletion cli/cmd/describe.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func printPodContainerInstrumentationInstancesInfo(instances []*odigosv1.Instrum
if instance.Status.Message != "" {
fmt.Println(" Message:", instance.Status.Message)
}
if instance.Status.Reason != "" {
if instance.Status.Reason != "" && instance.Status.Reason != string(common.AgentHealthStatusHealthy) {
fmt.Println(" Reason:", instance.Status.Reason)
}
if unhealthy {
Expand Down
6 changes: 3 additions & 3 deletions common/agent_health_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@ const (

// AgentHealthStatusUnsupportedRuntimeVersion represents that the agent is running on an unsupported runtime version
// For example: Otel sdk supports node.js >= 14 and workload is running with node.js 12
AgentHealthStatusUnsupportedRuntimeVersion = "UnsupportedRuntimeVersion"
AgentHealthStatusUnsupportedRuntimeVersion AgentHealthStatus = "UnsupportedRuntimeVersion"

// AgentHealthStatusNoHeartbeat is when the server did not receive a 3 heartbeats from the agent, thus it is considered unhealthy
AgentHealthStatusNoHeartbeat = "NoHeartbeat"
AgentHealthStatusNoHeartbeat AgentHealthStatus = "NoHeartbeat"

// AgentHealthStatusProcessTerminated is when the agent process is terminated.
// The termination can be due to normal shutdown (e.g. event loop run out of work)
// due to explicit termination (e.g. code calls exit(), or OS signal), or due to an error (e.g. unhandled exception)
AgentHealthProcessTerminated = "ProcessTerminated"
AgentHealthProcessTerminated AgentHealthStatus = "ProcessTerminated"
)
2 changes: 1 addition & 1 deletion opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ func (c *ConnectionHandlers) OnConnectionNoHeartbeat(ctx context.Context, connec
message := fmt.Sprintf("OpAMP server did not receive heartbeat from the agent, last message time: %s", connectionInfo.LastMessageTime.Format("2006-01-02 15:04:05 MST"))
// keep the instrumentation instance CR in unhealthy state so it can be used for troubleshooting
err := instrumentation_instance.UpdateInstrumentationInstanceStatus(ctx, connectionInfo.Pod, connectionInfo.ContainerName, c.kubeclient, connectionInfo.InstrumentedAppName, int(connectionInfo.Pid), c.scheme,
instrumentation_instance.WithHealthy(&healthy, common.AgentHealthStatusNoHeartbeat, &message),
instrumentation_instance.WithHealthy(&healthy, string(common.AgentHealthStatusNoHeartbeat), &message),
)
if err != nil {
return fmt.Errorf("failed to persist instrumentation instance health status on connection timedout: %w", err)
Expand Down
11 changes: 6 additions & 5 deletions opampserver/pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,12 @@ func StartOpAmpServer(ctx context.Context, logger logr.Logger, mgr ctrl.Manager,
return
}
}

err = handlers.UpdateInstrumentationInstanceStatus(ctx, &agentToServer, connectionInfo)
if err != nil {
logger.Error(err, "Failed to persist instrumentation device status")
// still return the opamp response
if connectionInfo != nil {
err = handlers.UpdateInstrumentationInstanceStatus(ctx, &agentToServer, connectionInfo)
if err != nil {
logger.Error(err, "Failed to persist instrumentation device status")
// still return the opamp response
}
}

if serverToAgent == nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ metadata:
labels:
instrumented-app: deployment-inventory
status:
healthy: null
healthy: true
identifyingAttributes:
- key: service.instance.id
(value != null): true
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/helm-chart/assert-instrumented-and-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ metadata:
labels:
instrumented-app: deployment-inventory
status:
healthy: null
healthy: true
identifyingAttributes:
- key: service.instance.id
(value != null): true
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/multi-apps/assert-instrumented-and-pipeline.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ metadata:
labels:
instrumented-app: deployment-inventory
status:
healthy: null
healthy: true
identifyingAttributes:
- key: service.instance.id
(value != null): true
Expand Down

0 comments on commit 0a02c79

Please sign in to comment.