-
-
Notifications
You must be signed in to change notification settings - Fork 561
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(rust): send telemetry data to the project node
| Conflicts: | implementations/rust/ockam/ockam_command/src/node/create.rs
- Loading branch information
1 parent
ce82cfc
commit 21125ec
Showing
30 changed files
with
516 additions
and
137 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
I12dea6479e2ececa711851c371f7bc9c16a162cd34cfe096386a76222c736f69 |
2 changes: 2 additions & 0 deletions
2
implementations/elixir/ockam/ockam_cloud_node/test/identity.secret
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
��#!RCD_Ou����l���<5�;ɧ. | ||
��� |
68 changes: 68 additions & 0 deletions
68
implementations/elixir/ockam/ockam_cloud_node/test/start_node.exs
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
defmodule Start do | ||
@doc """ | ||
Start a local project node with the ability to create secure channels. | ||
The node identity is either retrieved from files, or created then stored in files. | ||
""" | ||
def start_node() do | ||
with {:ok, own_identity} <- get_or_create_identity(), | ||
{:ok, keypair} <- Ockam.SecureChannel.Crypto.generate_dh_keypair(), | ||
{:ok, attestation} <- Ockam.Identity.attest_purpose_key(own_identity, keypair) do | ||
Ockam.Services.start_service( | ||
:secure_channel, | ||
identity: own_identity, | ||
address: "api", | ||
encryption_options: [static_keypair: keypair, static_key_attestation: attestation] | ||
) | ||
end | ||
end | ||
|
||
@doc """ | ||
Retrieve or create an identity | ||
""" | ||
def get_or_create_identity() do | ||
case File.read(identity_path()) do | ||
{:ok, bytes} -> | ||
with {:ok, secret_bytes} <- File.read(secret_signing_key_path()), | ||
{:ok, identity, _identifier} <- Ockam.Identity.import(bytes, secret_bytes) do | ||
{:ok, identity} | ||
end | ||
|
||
{:error, :enoent} -> | ||
with {_pub, secret} <- :crypto.generate_key(:eddsa, :ed25519), | ||
{:ok, identity} <- Ockam.Identity.create(secret), | ||
:ok <- File.mkdir_p(Path.dirname(identity_id_path())), | ||
:ok <- | ||
File.write( | ||
identity_id_path(), | ||
Ockam.Identity.Identifier.to_str(Ockam.Identity.get_identifier(identity)) | ||
), | ||
:ok <- File.write(identity_path(), Ockam.Identity.get_data(identity)), | ||
:ok <- File.write(secret_signing_key_path(), secret) do | ||
{:ok, identity} | ||
end | ||
end | ||
end | ||
|
||
@doc """ | ||
File storing the identity identifier | ||
""" | ||
defp identity_id_path() do | ||
Path.join(Application.fetch_env!(:ockam_cloud_node, :storage_path), "identity.id") | ||
end | ||
|
||
@doc """ | ||
File storing the identity change history | ||
""" | ||
defp identity_path() do | ||
Path.join(Application.fetch_env!(:ockam_cloud_node, :storage_path), "identity") | ||
end | ||
|
||
@doc """ | ||
File storing the identity secret | ||
""" | ||
defp secret_signing_key_path() do | ||
Path.join(Application.fetch_env!(:ockam_cloud_node, :storage_path), "identity.secret") | ||
end | ||
end | ||
|
||
Start.start_node() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
89 changes: 89 additions & 0 deletions
89
implementations/elixir/ockam/ockam_services/lib/services/grpc_forwarder.ex
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
defmodule Ockam.Services.GrpcForwarder do | ||
@moduledoc false | ||
|
||
use Ockam.Worker | ||
|
||
alias Ockam.Message | ||
alias Ockam.Worker | ||
|
||
require Logger | ||
|
||
defmodule GrpcRequest do | ||
@moduledoc """ | ||
gRPC request. This request is created by a grpcClient and can be forwarded to a gRPC server. | ||
This struct is currently being used to receive telemetry data from another node via a secure channel | ||
and forward it to an OpenTelemetry collector. | ||
""" | ||
use TypedStruct | ||
|
||
typedstruct do | ||
plugin(Ockam.TypedCBOR.Plugin) | ||
field(:method, String.t(), minicbor: [key: 1]) | ||
field(:path, String.t(), minicbor: [key: 2]) | ||
|
||
field(:version, :http09 | :http10 | :http11 | :http2 | :http3, | ||
minicbor: [key: 3, schema: {:enum, [http09: 0, http10: 1, http11: 2, http2: 3, http3: 4]}] | ||
) | ||
|
||
field(:headers, list(list(String.t())), minicbor: [key: 4]) | ||
field(:body, :binary, minicbor: [key: 5]) | ||
end | ||
end | ||
|
||
@doc """ | ||
Start the service by opening a channel to the grpc endpoint. | ||
""" | ||
@impl true | ||
def setup(options, state) do | ||
grpc_endpoint = Keyword.get(options, :grpc_endpoint, "http://localhost:4317") | ||
{:ok, channel} = GRPC.Stub.connect(grpc_endpoint) | ||
Logger.debug("Starting a grpc forwarder to: #{grpc_endpoint}") | ||
|
||
state = | ||
state | ||
|> Map.put(:grpc_endpoint, grpc_endpoint) | ||
|> Map.put(:channel, channel) | ||
|
||
{:ok, state} | ||
end | ||
|
||
@doc """ | ||
A message sent to this service is first encoded as an Ockam Request, where the body of the request is the gRPC request. | ||
In order to forward that request to the grpc endpoint, we need to act as a gRPC client and: | ||
- Set the appropriate gRPC headers | ||
- Pass the request path (it should look like `opentelemetry.proto.collector.trace.v1.TraceService/Export`). | ||
In principle the gRPC request could specify a method other than POST but for now we only support POST. | ||
We eventually send a reply with an empty body. In a general case, we should provide a better reply but for our | ||
current telemetry use case this is good enough. | ||
""" | ||
@impl true | ||
def handle_message(message, state) do | ||
{:ok, decoded_request} = Ockam.API.Request.decode(message.payload) | ||
{:ok, grpc_request, _unused} = GrpcRequest.decode(decoded_request.body) | ||
|
||
channel = Map.get(state, :channel) | ||
|
||
headers = | ||
GRPC.Transport.HTTP2.client_headers_without_reserved(%{ | ||
channel: channel, | ||
codec: channel.codec, | ||
compressor: GRPC.Compressor.Gzip, | ||
accepted_compressors: nil, | ||
headers: grpc_request.headers |> Enum.map(fn [a, b] -> {a, b} end) | ||
}) | ||
|
||
:gun.post(channel.adapter_payload.conn_pid, grpc_request.path, headers, grpc_request.body) | ||
|
||
reply = Message.reply(message, state.address, <<>>) | ||
Worker.route(reply, state) | ||
|
||
{:ok, state} | ||
end | ||
|
||
@impl true | ||
def handle_info(_unused, state) do | ||
{:noreply, state} | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.