Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[receiver/awss3receiver]: Add ingest progress notifications via OpAMP #33980

Merged
merged 72 commits into from
Oct 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
72 commits
Select commit Hold shift + click to select a range
318249a
[receiver/awss3receiver]: Add progress notifications
adcharre May 13, 2024
1f6639e
Add example usage of notifications and OPAMP
adcharre May 14, 2024
8653789
Update go.mod
adcharre May 14, 2024
4b011b1
Add makefile to notifications_example
adcharre May 14, 2024
bd8c837
Move example to examples directory
adcharre May 14, 2024
c376679
Fix go version
adcharre May 14, 2024
ead72ea
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre May 14, 2024
d0a8f34
Fix unit tests
adcharre May 14, 2024
64146e0
Correct package name
adcharre May 14, 2024
55077c9
Change type to enhancement
adcharre May 14, 2024
242f402
Fix checkapi issue
adcharre May 14, 2024
b81289d
go mod tidy
adcharre May 14, 2024
5e2f800
gci s3reader_test.go
adcharre May 14, 2024
d064eba
Fix race condition in notifications test
adcharre May 14, 2024
1aae6bf
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre May 14, 2024
3802e56
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 5, 2024
8807e6c
Update to use OTLP Logs to send status
adcharre Jul 9, 2024
f0d001a
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 10, 2024
477da98
Fix lint issues
adcharre Jul 10, 2024
cbcdfb3
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 10, 2024
515e145
Fix deps
adcharre Jul 10, 2024
e656be9
Lint fixes
adcharre Jul 10, 2024
e11646f
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 10, 2024
c90c948
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 11, 2024
9d15fca
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 17, 2024
e052737
Update receiver.go
adcharre Jul 19, 2024
30203f7
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 19, 2024
b320431
Unexport StatusNotification
adcharre Jul 30, 2024
3a09647
Update receiver/awss3receiver/notifications_test.go
adcharre Jul 30, 2024
29fe6d9
Update receiver/awss3receiver/README.md
adcharre Jul 30, 2024
a28fd2c
Rename opamp config setting to opampextension
adcharre Jul 30, 2024
f401880
Merge branch 'awss3receiver_notifications' of https://github.com/adch…
adcharre Jul 30, 2024
787ac6b
Add retry logic
adcharre Jul 30, 2024
dccdce6
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Jul 30, 2024
92f981a
Fix race conditions in notification tests
adcharre Aug 6, 2024
baaabe0
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 6, 2024
e3a9e98
Update examples go.mod
adcharre Aug 6, 2024
899b6de
Update go.sum
adcharre Aug 6, 2024
af20364
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 9, 2024
6f781d9
On context done send failure notification
adcharre Aug 9, 2024
fb739c3
Update example go.mod
adcharre Aug 9, 2024
7236b62
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 10, 2024
8003f70
Update example go.mod
adcharre Aug 10, 2024
4d3654e
Move the notifications example to the examples/ directory
adcharre Aug 16, 2024
54cbf7e
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 16, 2024
df8f44d
Update examples go.mod
adcharre Aug 16, 2024
5d9bdd6
Delete Makefile
adcharre Aug 16, 2024
f9a0df3
Remove example
adcharre Aug 17, 2024
bef04e1
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 17, 2024
9db5a61
Update README.md
adcharre Aug 17, 2024
30a9191
Change notification time types to int64
adcharre Aug 20, 2024
94f72d6
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 20, 2024
534f6f3
Update notifications_test.go
adcharre Aug 20, 2024
e05e447
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 20, 2024
e14e3ef
Update README.md
adcharre Aug 21, 2024
6fd5ee7
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Aug 25, 2024
a2f0c34
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 3, 2024
579d2ca
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 3, 2024
669fea5
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 4, 2024
0279497
Update notifications_test.go
adcharre Sep 4, 2024
d5b0231
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 11, 2024
cec35c4
fix testifylint issues
adcharre Sep 11, 2024
aca9467
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 11, 2024
6721beb
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 20, 2024
1773327
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 20, 2024
00818ed
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 21, 2024
8fc74f4
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 25, 2024
d63e5c8
Add nil handler check
adcharre Sep 25, 2024
685030c
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Sep 27, 2024
49c82c4
Check handler != nil before shutdown
adcharre Oct 1, 2024
3f8d1d9
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Oct 1, 2024
73b3d37
Merge remote-tracking branch 'upstream/main' into awss3receiver_notif…
adcharre Oct 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/awss3receiver_notifications.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: awss3receiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: 'Add support for monitoring the progress of ingesting data from an S3 bucket via OpAMP custom messages.'

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [30750]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
25 changes: 24 additions & 1 deletion receiver/awss3receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ The following exporter configuration parameters are supported.
| `encodings:` | An array of entries with the following properties: | | Optional |
| `extension` | Extension to use for decoding a key with a matching suffix. | | Required |
| `suffix` | Key suffix to match against. | | Required |
| `notifications:` | | | |
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |

### Time format for `starttime` and `endtime`
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
Expand Down Expand Up @@ -67,4 +69,25 @@ receivers:
encodings:
- extension: text_encoding
suffix: ".txt"
```
```

## Notifications
The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of
"org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus".
The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the
record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the
following attributes:

| Attribute | Description |
|:------------------|:--------------------------------------------------------------------------------|
| `telemetry_type` | The type of telemetry being ingested. One of "traces", "metrics", or "logs". |
| `ingest_status` | The status of the data ingestion. One of "ingesting", "failed", or "completed". |
| `start_time` | The time to start retrieving data as an Int64, nanoseconds since Unix epoch. |
| `end_time` | The time to stop retrieving data as an Int64, nanoseconds since Unix epoch. |
| `failure_message` | Error message if `ingest_status` is "failed". |

The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time.
If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with
the time of the data being ingested when the failure occurred.
If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent.

13 changes: 9 additions & 4 deletions receiver/awss3receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,22 @@ type S3DownloaderConfig struct {
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
}

type Notifications struct {
OpAMP *component.ID `mapstructure:"opampextension"`
}

type Encoding struct {
Extension component.ID `mapstructure:"extension"`
Suffix string `mapstructure:"suffix"`
}

// Config defines the configuration for the file receiver.
type Config struct {
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
StartTime string `mapstructure:"starttime"`
EndTime string `mapstructure:"endtime"`
Encodings []Encoding `mapstructure:"encodings"`
S3Downloader S3DownloaderConfig `mapstructure:"s3downloader"`
StartTime string `mapstructure:"starttime"`
EndTime string `mapstructure:"endtime"`
Encodings []Encoding `mapstructure:"encodings"`
Notifications Notifications `mapstructure:"notifications"`
}

const (
Expand Down
5 changes: 4 additions & 1 deletion receiver/awss3receiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
func TestLoadConfig(t *testing.T) {
cm, err := confmaptest.LoadConf(filepath.Join("testdata", "config.yaml"))
require.NoError(t, err)

opampExtension := component.NewIDWithName(component.MustNewType("opamp"), "bar")
tests := []struct {
id component.ID
expected component.Config
Expand Down Expand Up @@ -89,6 +89,9 @@ func TestLoadConfig(t *testing.T) {
Suffix: "nop",
},
},
Notifications: Notifications{
OpAMP: &opampExtension,
},
},
},
}
Expand Down
4 changes: 4 additions & 0 deletions receiver/awss3receiver/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.27.39
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.25
github.com/aws/aws-sdk-go-v2/service/s3 v1.63.3
github.com/open-telemetry/opamp-go v0.15.0
github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages v0.110.1-0.20241004063257-d6cd5935eefc
github.com/stretchr/testify v1.9.0
go.opentelemetry.io/collector/component v0.110.1-0.20241004063257-d6cd5935eefc
go.opentelemetry.io/collector/confmap v1.16.1-0.20241004063257-d6cd5935eefc
Expand Down Expand Up @@ -71,3 +73,5 @@ require (
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages => ../../extension/opampcustommessages
2 changes: 2 additions & 0 deletions receiver/awss3receiver/go.sum

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

121 changes: 121 additions & 0 deletions receiver/awss3receiver/notifications.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package awss3receiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awss3receiver"

import (
"context"
"errors"
"fmt"
"time"

"github.com/open-telemetry/opamp-go/client/types"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/extension/opampcustommessages"
)

const (
evan-bradley marked this conversation as resolved.
Show resolved Hide resolved
IngestStatusCompleted = "completed"
IngestStatusFailed = "failed"
IngestStatusIngesting = "ingesting"
CustomCapability = "org.opentelemetry.collector.receiver.awss3"
maxNotificationAttempts = 3
)

type statusNotification struct {
TelemetryType string
IngestStatus string
StartTime time.Time
EndTime time.Time
IngestTime time.Time
FailureMessage string
}

type statusNotifier interface {
Start(ctx context.Context, host component.Host) error
Shutdown(ctx context.Context) error
SendStatus(ctx context.Context, message statusNotification)
}

type opampNotifier struct {
logger *zap.Logger
opampExtensionID component.ID
handler opampcustommessages.CustomCapabilityHandler
}

func newNotifier(config *Config, logger *zap.Logger) statusNotifier {
if config.Notifications.OpAMP != nil {
return &opampNotifier{opampExtensionID: *config.Notifications.OpAMP, logger: logger}
}
return nil
}

func (n *opampNotifier) Start(_ context.Context, host component.Host) error {
ext, ok := host.GetExtensions()[n.opampExtensionID]
if !ok {
return fmt.Errorf("extension %q does not exist", n.opampExtensionID)
}

registry, ok := ext.(opampcustommessages.CustomCapabilityRegistry)
if !ok {
return fmt.Errorf("extension %q is not a custom message registry", n.opampExtensionID)
}

handler, err := registry.Register(CustomCapability)
if err != nil {
return fmt.Errorf("failed to register custom capability: %w", err)
}
if handler == nil {
return errors.New("custom capability handler is nil")
}
n.handler = handler
return nil
}

func (n *opampNotifier) Shutdown(_ context.Context) error {
if n.handler != nil {
n.handler.Unregister()
}
return nil
}

func (n *opampNotifier) SendStatus(_ context.Context, message statusNotification) {
logs := plog.NewLogs()
log := logs.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
log.Body().SetStr("status")
attributes := log.Attributes()
attributes.PutStr("telemetry_type", message.TelemetryType)
attributes.PutStr("ingest_status", message.IngestStatus)
attributes.PutInt("start_time", int64(pcommon.NewTimestampFromTime(message.StartTime)))
attributes.PutInt("end_time", int64(pcommon.NewTimestampFromTime(message.EndTime)))
log.SetTimestamp(pcommon.NewTimestampFromTime(message.IngestTime))

if message.FailureMessage != "" {
attributes.PutStr("failure_message", message.FailureMessage)
}

marshaler := plog.ProtoMarshaler{}
bytes, err := marshaler.MarshalLogs(logs)
if err != nil {
return
}
for attempt := 0; attempt < maxNotificationAttempts; attempt++ {
sendingChan, sendingErr := n.handler.SendMessage("TimeBasedIngestStatus", bytes)
switch {
case sendingErr == nil:
return
case errors.Is(sendingErr, types.ErrCustomMessagePending):
<-sendingChan
default:
// The only other errors returned by the OpAmp extension are unrecoverable, ie ErrCustomCapabilityNotSupported
// so just log an error and return.
n.logger.Error("Failed to send notification", zap.Error(sendingErr), zap.Int("attempt", attempt))
return
}
}
n.logger.Error("Failed to send notification after multiple attempts", zap.Int("max_attempts", maxNotificationAttempts))
}
Loading