Skip to content

Commit

Permalink
stubbed out revised streaming functions
Browse files Browse the repository at this point in the history
  • Loading branch information
nr-swilloughby committed Mar 21, 2024
1 parent 75d681e commit b6c1693
Showing 1 changed file with 75 additions and 27 deletions.
102 changes: 75 additions & 27 deletions v3/integrations/nrawsbedrock/nrawsbedrock.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ package nrawsbedrock
import (
"context"
"encoding/json"
"fmt"
"runtime/debug"
"strings"
"sync"
Expand Down Expand Up @@ -98,51 +99,98 @@ type ResponseStream struct {
params *bedrockruntime.InvokeModelInput

// The model output
response *bedrockruntime.InvokeModelWithResponseStreamOutput
Response *bedrockruntime.InvokeModelWithResponseStreamOutput
}

//
// InvokeModelWithResponseStream works as InvokeModel does, but the response is returned as a stream.
// In summary, given a bedrockruntime.Client b, where you would normally call the AWS method
// b.InvokeModelWithResponseStream(c, p, f...)
// InvokeModelWithResponseStream invokes a model but unlike the InvokeModel method, the data returned
// is a stream of multiple events instead of a single response value.
// This function is the analogue of the bedrockruntime library InvokeModelWithResponseStream function,
// so that, given a bedrockruntime.Client b, where you would normally call the AWS method
// response, err := b.InvokeModelWithResponseStream(c, p, f...)
// You instead invoke the New Relic InvokeModelWithResponseStream function as:
// nrbedrock.InvokeModelWithResponseStream(app, b, c, p, f...)
// rstream, err := nrbedrock.InvokeModelWithResponseStream(app, b, c, p, f...)
// where app is your New Relic Application value.
//
// If using the bedrockruntime library directly, you would then process the response stream value
// (the response variable in the above example), iterating over the provided channel where the stream
// data appears until it is exhausted, and then calling Close() on the stream (see the bedrock API
// documentation for details).
//
// When using the New Relic nrawsbedrock integration, this response value is available as
// rstream.Response. You would perform the same operations as you would directly with the bedrock API
// once you have that value.
// Since this means control has passed back to your code for processing of the stream data, you need to
// add instrumentation calls to your processing code:
// rstream.RecordEvent(content) // for each event received from the stream
// rstream.Close() // when you are finished and are going to close the stream
//
// However, see ProcessModelWithResponseStream for an easier alternative.
//
// Either start a transaction on your own and add it to the context c passed into this function, or
// a transaction will be started for you that lasts only for the duration of the model invocation.
//
// You may elect to have our function collect the stream's output for you, instrumenting the operations
// along the way. To do this, pass a callback function with the invocation signature
// callback(app *newrelic.Application, txn *newrelic.Transaction, ctx context.Context, part []byte) error
// This will be called for every event read from the response stream, allowing your application
// to collect and use that streamed data as it arrives. If your callback function returns a non-nil error
// value, the stream reading will terminate immediately.
func InvokeModelWithResponseStream(app *newrelic.Application, brc *bedrockruntime.Client, ctx context.Context, params *bedrockruntime.InvokeModelInput, optFns ...func(*bedrockruntime.Options)) (ResponseStream, error) {
return InvokeModelWithResponseStreamAttributes(app, brc, ctx, params, nil, optFns...)
}

//
// InvokeModelWithResponseStreamAttributes is identical to InvokeModelWithResponseStream except that
// it adds the attrs parameter, which is a
// map of strings to values of any type. This map holds any custom attributes you wish to add to the reported metrics
// relating to this model invocation.
//
// Each key in the attrs map must begin with "llm."; if any of them do not, "llm." is automatically prepended to
// the attribute key before the metrics are sent out.
//
// Alternatively, if a nil value is passed for the callback function, we will assume that you want to do all
// the stream reading yourself. In that case you will need to make calls to the following functions as you
// read data in order for the instrumentation to function correctly.
// We recommend including at least "llm.conversation_id" in your attributes.
//
// RecordStreamResponseEvent(app, txn, ctx, part)
// for every part read from the output stream.
func InvokeModelWithResponseStreamAttributes(app *newrelic.Application, brc *bedrockruntime.Client, ctx context.Context, params *bedrockruntime.InvokeModelInput, attrs map[string]any, optFns ...func(*bedrockruntime.Options)) (ResponseStream, error) {
return ResponseStream{}, fmt.Errorf("not implemented")
}

//
// CompleteStreamResponse(app, txn, ctx)
// after reading all of the stream data you will be reading. This finishes up the instrumentation
// for the model invocation.
// RecordEvent records a single stream event as read from the data stream started by InvokeModelWithStreamResponse.
//
/*
func InvokeModelWithResponseStream(app *newrelic.Application, brc *bedrockruntime.Client, ctx context.Context, params *bedrockruntime.InvokeModelInput, optFns ...func(*bedrockruntime.Options)) (ResponseStream, error) {
func (s *ResponseStream) RecordEvent(data []byte) error {
return fmt.Errorf("not implemented")
}

func (s *ResponseStream) RecordEvent(data []byte) error {}
func (s *ResponseStream) Close() error {}
//
// Close finishes up the instrumentation for a response stream.
//
func (s *ResponseStream) Close() error {
return fmt.Errorf("not implemented")
}

func InvokeModelWithResponseStream(app *newrelic.Application, callback func(*newrelic.Application, *newrelic.Transaction, context.Context, []byte) error, brc *bedrockruntime.Client, ctx context.Context, params *bedrockruntime.InvokeModelInput, optFns ...func(*bedrockruntime.Options)) (*bedrockruntime.InvokeModelWithResponseStreamOutput, error) {
//
// ProcessModelWithResponseStream works just like InvokeModelWithResponseStream, except that
// it handles all the stream processing automatically for you. For each event received from
// the response stream, it will invoke the callback function you pass into the function call
// so that your application can act on the response data. When the stream is complete, the
// ProcessModelWithResponseStream call will return.
//
// If your callback function returns an error, the processing of the response stream will
// terminate at that point.
//
func ProcessModelWithResponseStream(app *newrelic.Application, brc *bedrockruntime.Client, ctx context.Context, callback func([]byte) error, params *bedrockruntime.InvokeModelInput, optFns ...func(*bedrockruntime.Options)) error {
return ProcessModelWithResponseStreamAttributes(app, brc, ctx, callback, params, nil, optFns...)
}

func RecordStreamResponseEvent() {}
func CompleteStreamResponse() {}
*/
//
// ProcessModelWithResponseStreamAttributes is identical to ProcessModelWithResponseStream except that
// it adds the attrs parameter, which is a
// map of strings to values of any type. This map holds any custom attributes you wish to add to the reported metrics
// relating to this model invocation.
//
// Each key in the attrs map must begin with "llm."; if any of them do not, "llm." is automatically prepended to
// the attribute key before the metrics are sent out.
//
// We recommend including at least "llm.conversation_id" in your attributes.
//
func ProcessModelWithResponseStreamAttributes(app *newrelic.Application, brc *bedrockruntime.Client, ctx context.Context, callback func([]byte) error, params *bedrockruntime.InvokeModelInput, attrs map[string]any, optFns ...func(*bedrockruntime.Options)) error {
return fmt.Errorf("not implemented")
}

//
// InvokeModel provides an instrumented interface through which to call the AWS Bedrock InvokeModel function.
Expand Down

0 comments on commit b6c1693

Please sign in to comment.