From 823fed3f9eb8eb83655c9e0b26a34bd7529a183c Mon Sep 17 00:00:00 2001 From: tamirdavid1 Date: Wed, 17 Jul 2024 09:22:14 +0300 Subject: [PATCH] fix: send and get confighash on remote config changes (#1359) Co-authored-by: Tamir David --- agents/nodejs/src/opamp/client-http.ts | 2 +- agents/python/opamp/http_client.py | 39 ++++++++++++++++++++------ opampserver/pkg/server/handlers.go | 5 ---- 3 files changed, 31 insertions(+), 15 deletions(-) diff --git a/agents/nodejs/src/opamp/client-http.ts b/agents/nodejs/src/opamp/client-http.ts index 24fea4dda..76caaa726 100644 --- a/agents/nodejs/src/opamp/client-http.ts +++ b/agents/nodejs/src/opamp/client-http.ts @@ -198,7 +198,7 @@ export class OpAMPClientHttp { this.remoteConfigStatus = new RemoteConfigStatus({ lastRemoteConfigHash: remoteConfigOpampMessage.configHash, status: RemoteConfigStatuses.RemoteConfigStatuses_FAILED, - errorMessage: "missing instrumentation libraries remote config", + errorMessage: "failed to apply the new remote config", }); this.logger.warn( "Error extracting remote config from OpAMP message", diff --git a/agents/python/opamp/http_client.py b/agents/python/opamp/http_client.py index 0e4e68f36..2fcdfe399 100644 --- a/agents/python/opamp/http_client.py +++ b/agents/python/opamp/http_client.py @@ -33,6 +33,7 @@ def __init__(self, event, condition: threading.Condition): self.event = event self.next_sequence_num = 0 self.instance_uid = uuid7().__str__() + self.remote_config_status = None def start(self): self.client_thread = threading.Thread(target=self.run, name="OpAMPClientThread", daemon=True) @@ -52,9 +53,10 @@ def run(self): def send_first_message_with_retry(self) -> None: max_retries = 5 delay = 2 - for attempt in range(1, max_retries + 1): - first_message_server_to_agent = self.send_full_state() + for attempt in range(1, max_retries + 1): try: + first_message_server_to_agent = self.send_full_state() + self.update_remote_config_status(first_message_server_to_agent) self.resource_attributes = utils.parse_first_message_to_resource_attributes(first_message_server_to_agent, opamp_logger) break except Exception as e: @@ -68,18 +70,24 @@ def worker(self): with self.condition: try: server_to_agent = self.send_heartbeat() + if self.update_remote_config_status(server_to_agent): + opamp_logger.info("Remote config updated, applying changes...") + # TODO: implement changes based on the remote config + if server_to_agent.flags & opamp_pb2.ServerToAgentFlags_ReportFullState: opamp_logger.info("Received request to report full state") - self.send_full_state() + server_to_agent = self.send_full_state() + self.update_remote_config_status(server_to_agent) except requests.RequestException as e: opamp_logger.error(f"Error fetching data: {e}") self.condition.wait(30) - - def send_heartbeat(self): + + def send_heartbeat(self) -> opamp_pb2.ServerToAgent: opamp_logger.debug("Sending heartbeat to OpAMP server...") try: - return self.send_agent_to_server_message(opamp_pb2.AgentToServer()) + agent_to_server = opamp_pb2.AgentToServer(remote_config_status=self.remote_config_status) + return self.send_agent_to_server_message(agent_to_server) except requests.RequestException as e: opamp_logger.error(f"Error sending heartbeat to OpAMP server: {e}") @@ -107,9 +115,13 @@ def send_full_state(self): ) return self.send_agent_to_server_message(opamp_pb2.AgentToServer(agent_description=agent_description)) - def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent: + def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent: + message.instance_uid = self.instance_uid.encode('utf-8') - message.sequence_num = self.next_sequence_num + message.sequence_num = self.next_sequence_num + if self.remote_config_status: + message.remote_config_status.CopyFrom(self.remote_config_status) + self.next_sequence_num += 1 message_bytes = message.SerializeToString() @@ -162,4 +174,13 @@ def shutdown(self): self.condition.notify_all() self.client_thread.join() - self.send_agent_to_server_message(disconnect_message) \ No newline at end of file + self.send_agent_to_server_message(disconnect_message) + + def update_remote_config_status(self, server_to_agent: opamp_pb2.ServerToAgent) -> bool: + if server_to_agent.HasField("remote_config"): + remote_config_hash = server_to_agent.remote_config.config_hash + remote_config_status = opamp_pb2.RemoteConfigStatus(last_remote_config_hash=remote_config_hash) + self.remote_config_status = remote_config_status + return True + + return False \ No newline at end of file diff --git a/opampserver/pkg/server/handlers.go b/opampserver/pkg/server/handlers.go index aba2d170e..18b6e2b41 100644 --- a/opampserver/pkg/server/handlers.go +++ b/opampserver/pkg/server/handlers.go @@ -55,11 +55,6 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin return nil, nil, fmt.Errorf("missing pid in agent description") } - if firstMessage.PackageStatuses == nil { - // this might come from an older agent which not yet supports package statuses - c.logger.Info("missing package statuses in first agent to server message", "deviceId", deviceId) - } - k8sAttributes, pod, err := c.deviceIdCache.GetAttributesFromDevice(ctx, deviceId) if err != nil { c.logger.Error(err, "failed to get attributes from device", "deviceId", deviceId)