Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Validation for required attributes #865

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion pkg/beholder/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ func TestClient(t *testing.T) {
"byte_key_1": []byte("byte_val_1"),
"str_slice_key_1": []string{"str_val_1", "str_val_2"},
"nil_key_1": nil,
"beholder_data_schema": "/schemas/ids/1001", // Required field, URI
"beholder_data_schema": "/schemas/ids/1001", // Required field, URI
"node_csa_key": "node_csa_val", // Required
"node_csa_signature": "mode_csa_signature_val", // Required
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this would be an attribute, the signature will be coming through the authentication headers

}
}
defaultMessageBody := []byte("body bytes")
Expand Down Expand Up @@ -168,6 +170,8 @@ func TestEmitterMessageValidation(t *testing.T) {
name: "Invalid URI",
attrs: Attributes{
"beholder_data_schema": "example-schema",
"node_csa_key": "node_csa_val", // Required
"node_csa_signature": "mode_csa_signature_val", // Required
},
exporterCalledTimes: 0,
expectedError: "'Metadata.BeholderDataSchema' Error:Field validation for 'BeholderDataSchema' failed on the 'uri' tag",
Expand All @@ -177,6 +181,8 @@ func TestEmitterMessageValidation(t *testing.T) {
exporterCalledTimes: 1,
attrs: Attributes{
"beholder_data_schema": "/example-schema/versions/1",
"node_csa_key": "node_csa_val", // Required
"node_csa_signature": "mode_csa_signature_val", // Required
},
expectedError: "",
},
Expand Down
4 changes: 4 additions & 0 deletions pkg/beholder/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ func ExampleNewClient() {
for range 10 {
err := beholder.GetEmitter().Emit(context.Background(), payloadBytes,
"beholder_data_schema", "/custom-message/versions/1", // required
"node_csa_key", "node_csa_val", // required
"node_csa_signature", "mode_csa_signature_val", // required
"beholder_data_type", "custom_message",
"foo", "bar",
)
Expand Down Expand Up @@ -105,6 +107,8 @@ func ExampleNewNoopClient() {

err := beholder.GetEmitter().Emit(context.Background(), []byte("test message"),
"beholder_data_schema", "/custom-message/versions/1", // required
"node_csa_key", "node_csa_val", // required
"node_csa_signature", "mode_csa_signature_val", // required
)
if err != nil {
log.Printf("Error emitting message: %v", err)
Expand Down
13 changes: 10 additions & 3 deletions pkg/beholder/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package beholder

import (
"fmt"
"time"

"github.com/go-playground/validator/v10"
"go.opentelemetry.io/otel/attribute"
Expand All @@ -22,9 +23,9 @@ type Metadata struct {
// The version of the CL node.
NodeVersion string
// mTLS public key for the node operator. This is used as an identity key but with the added benefit of being able to provide signatures.
NodeCsaKey string
NodeCsaKey string `validate:"required"`
Copy link
Contributor

@pkcll pkcll Oct 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@4of9, are these going to be added as resource attributes to beholder config (since they are static) or we want to pass them as regular attributes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinkin NodeCsaKey would be included as a resource attribute so users don't have to add it to every message

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same applies to NodeCsaSignature

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to add validation for resource attributes as well.
E.g check if NodeCsaKey is set then NodeCsaSignature should be set as well

// Signature from CSA private key.
NodeCsaSignature string
NodeCsaSignature string `validate:"required"`
DonID string
// The RDD network name the CL node is operating with.
NetworkName []string
Expand All @@ -41,6 +42,7 @@ type Metadata struct {
CapabilityVersion string
CapabilityName string
NetworkChainID string
Timestamp time.Time
}

func (m Metadata) Attributes() Attributes {
Expand All @@ -61,6 +63,7 @@ func (m Metadata) Attributes() Attributes {
"capability_version": m.CapabilityVersion,
"capability_name": m.CapabilityName,
"network_chain_id": m.NetworkChainID,
"timestamp": m.Timestamp,
}
}

Expand Down Expand Up @@ -160,6 +163,8 @@ func OtelAttr(key string, value any) otellog.KeyValue {
return otellog.Bool(key, v)
case []byte:
return otellog.Bytes(key, v)
case time.Time:
return otellog.Int64(key, v.Unix())
case nil:
return otellog.Empty(key)
case otellog.Value:
Expand Down Expand Up @@ -211,13 +216,15 @@ func (m *Metadata) FromAttributes(attrs Attributes) *Metadata {
m.CapabilityName = v.(string)
case "network_chain_id":
m.NetworkChainID = v.(string)
case "timestamp":
m.Timestamp = v.(time.Time)
}
}
return m
}

func NewMetadata(attrs Attributes) *Metadata {
m := &Metadata{}
m := &Metadata{Timestamp: time.Now().UTC()}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nanchano, We should try not to use attributes with unbounded cardinality.
It can cause various problems see this thread for details
https://chainlink-core.slack.com/archives/C06TEMFRPQD/p1729082258236499

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could be wrong, but the unbounded cardinality issue is a problem for metrics, not for custom events, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How would we wanna represent a timestamp then? As a string and let clients parse it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It probably should be inside the custom message payload.
FYI there is Timestamp field exist in otel log record
https://opentelemetry.io/docs/specs/otel/logs/data-model/#log-and-event-record-definition

I could be wrong, but the unbounded cardinality issue is a problem for metrics, not for custom events, right?
yes, but attributes can be reused between different telemetry types e.g span attributes are copied to metric attributes.

not for custom events, right?
I depends how we are going to store it.

We can put it in the attribute and get back to it later.
Whats the use case for having it in attributes?

Copy link
Contributor

@pkcll pkcll Oct 22, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Loki suffer from labels hight cardinality as well:
See also https://utcc.utoronto.ca/~cks/space/blog/sysadmin/GrafanaLokiCardinalityProblem
https://grafana.com/docs/loki/latest/get-started/labels/#cardinality

This is high cardinality, and it can lead to significant performance degredation.

When we talk about cardinality we are referring to the combination of labels and values and the number of streams they create. High cardinality is using labels with a large range of possible values, such as ip, or combining many labels, even if they have a small and finite set of values, such as using status_code and action.

High cardinality causes Loki to build a huge index and to flush thousands of tiny chunks to the object store. Loki currently performs very poorly in this configuration. If not accounted for, high cardinality will significantly reduce the operability and cost-effectiveness of Loki.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, but custom events are not sent to Loki, so the cardinality issue is at play here, I think

m.FromAttributes(attrs)
return m
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/beholder/message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,17 @@ func ExampleMetadata() {
fmt.Printf("%#v\n", m)
fmt.Println(m.Attributes())
// Output:
// beholder.Metadata{BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id"}
// map[beholder_data_schema:/schemas/ids/test_schema capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id]
// beholder.Metadata{BeholderDataSchema:"/schemas/ids/test_schema", NodeVersion:"v1.0.0", NodeCsaKey:"test_key", NodeCsaSignature:"test_signature", DonID:"test_don_id", NetworkName:[]string{"test_network"}, WorkflowID:"test_workflow_id", WorkflowName:"test_workflow_name", WorkflowOwnerAddress:"test_owner_address", WorkflowSpecID:"test_spec_id", WorkflowExecutionID:"test_execution_id", CapabilityContractAddress:"test_contract_address", CapabilityID:"test_capability_id", CapabilityVersion:"test_capability_version", CapabilityName:"test_capability_name", NetworkChainID:"test_chain_id", Timestamp:time.Date(1, time.January, 1, 0, 0, 0, 0, time.UTC)}
// map[beholder_data_schema:/schemas/ids/test_schema capability_contract_address:test_contract_address capability_id:test_capability_id capability_name:test_capability_name capability_version:test_capability_version don_id:test_don_id network_chain_id:test_chain_id network_name:[test_network] node_csa_key:test_key node_csa_signature:test_signature node_version:v1.0.0 timestamp:0001-01-01 00:00:00 +0000 UTC workflow_execution_id:test_execution_id workflow_id:test_workflow_id workflow_name:test_workflow_name workflow_owner_address:test_owner_address workflow_spec_id:test_spec_id]

}

func ExampleValidate() {
validate := validator.New()

metadata := beholder.Metadata{}
metadata.NodeCsaKey = "test_key"
metadata.NodeCsaSignature = "test_signature"
if err := validate.Struct(metadata); err != nil {
fmt.Println(err)
}
Expand Down
Loading