Skip to content

Commit

Permalink
fix: send and get confighash on remote config changes (#1359)
Browse files Browse the repository at this point in the history
Co-authored-by: Tamir David <tamirdavid@Tamirs-MacBook-Pro.local>
  • Loading branch information
tamirdavid1 and Tamir David authored Jul 17, 2024
1 parent ef7322c commit 823fed3
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 15 deletions.
2 changes: 1 addition & 1 deletion agents/nodejs/src/opamp/client-http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ export class OpAMPClientHttp {
this.remoteConfigStatus = new RemoteConfigStatus({
lastRemoteConfigHash: remoteConfigOpampMessage.configHash,
status: RemoteConfigStatuses.RemoteConfigStatuses_FAILED,
errorMessage: "missing instrumentation libraries remote config",
errorMessage: "failed to apply the new remote config",
});
this.logger.warn(
"Error extracting remote config from OpAMP message",
Expand Down
39 changes: 30 additions & 9 deletions agents/python/opamp/http_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ def __init__(self, event, condition: threading.Condition):
self.event = event
self.next_sequence_num = 0
self.instance_uid = uuid7().__str__()
self.remote_config_status = None

def start(self):
self.client_thread = threading.Thread(target=self.run, name="OpAMPClientThread", daemon=True)
Expand All @@ -52,9 +53,10 @@ def run(self):
def send_first_message_with_retry(self) -> None:
max_retries = 5
delay = 2
for attempt in range(1, max_retries + 1):
first_message_server_to_agent = self.send_full_state()
for attempt in range(1, max_retries + 1):
try:
first_message_server_to_agent = self.send_full_state()
self.update_remote_config_status(first_message_server_to_agent)
self.resource_attributes = utils.parse_first_message_to_resource_attributes(first_message_server_to_agent, opamp_logger)
break
except Exception as e:
Expand All @@ -68,18 +70,24 @@ def worker(self):
with self.condition:
try:
server_to_agent = self.send_heartbeat()
if self.update_remote_config_status(server_to_agent):
opamp_logger.info("Remote config updated, applying changes...")
# TODO: implement changes based on the remote config

if server_to_agent.flags & opamp_pb2.ServerToAgentFlags_ReportFullState:
opamp_logger.info("Received request to report full state")
self.send_full_state()
server_to_agent = self.send_full_state()
self.update_remote_config_status(server_to_agent)

except requests.RequestException as e:
opamp_logger.error(f"Error fetching data: {e}")
self.condition.wait(30)
def send_heartbeat(self):

def send_heartbeat(self) -> opamp_pb2.ServerToAgent:
opamp_logger.debug("Sending heartbeat to OpAMP server...")
try:
return self.send_agent_to_server_message(opamp_pb2.AgentToServer())
agent_to_server = opamp_pb2.AgentToServer(remote_config_status=self.remote_config_status)
return self.send_agent_to_server_message(agent_to_server)
except requests.RequestException as e:
opamp_logger.error(f"Error sending heartbeat to OpAMP server: {e}")

Expand Down Expand Up @@ -107,9 +115,13 @@ def send_full_state(self):
)
return self.send_agent_to_server_message(opamp_pb2.AgentToServer(agent_description=agent_description))

def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent:
def send_agent_to_server_message(self, message: opamp_pb2.AgentToServer) -> opamp_pb2.ServerToAgent:

message.instance_uid = self.instance_uid.encode('utf-8')
message.sequence_num = self.next_sequence_num
message.sequence_num = self.next_sequence_num
if self.remote_config_status:
message.remote_config_status.CopyFrom(self.remote_config_status)

self.next_sequence_num += 1
message_bytes = message.SerializeToString()

Expand Down Expand Up @@ -162,4 +174,13 @@ def shutdown(self):
self.condition.notify_all()
self.client_thread.join()

self.send_agent_to_server_message(disconnect_message)
self.send_agent_to_server_message(disconnect_message)

def update_remote_config_status(self, server_to_agent: opamp_pb2.ServerToAgent) -> bool:
if server_to_agent.HasField("remote_config"):
remote_config_hash = server_to_agent.remote_config.config_hash
remote_config_status = opamp_pb2.RemoteConfigStatus(last_remote_config_hash=remote_config_hash)
self.remote_config_status = remote_config_status
return True

return False
5 changes: 0 additions & 5 deletions opampserver/pkg/server/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,6 @@ func (c *ConnectionHandlers) OnNewConnection(ctx context.Context, deviceId strin
return nil, nil, fmt.Errorf("missing pid in agent description")
}

if firstMessage.PackageStatuses == nil {
// this might come from an older agent which not yet supports package statuses
c.logger.Info("missing package statuses in first agent to server message", "deviceId", deviceId)
}

k8sAttributes, pod, err := c.deviceIdCache.GetAttributesFromDevice(ctx, deviceId)
if err != nil {
c.logger.Error(err, "failed to get attributes from device", "deviceId", deviceId)
Expand Down

0 comments on commit 823fed3

Please sign in to comment.