Skip to content

Commit

Permalink
Merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
PratRanj07 committed Dec 19, 2024
2 parents c03f77f + bb108e5 commit cf9ef1c
Show file tree
Hide file tree
Showing 23 changed files with 772 additions and 91 deletions.
43 changes: 43 additions & 0 deletions .github/pull_request_template.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<!--
Suggested PR template: Fill/delete/add sections as needed. Optionally delete any commented block.
-->
What
----
<!--
Briefly describe **what** you have changed and **why**.
Optionally include implementation strategy.
-->

Checklist
------------------
- [ ] Contains customer facing changes? Including API/behavior changes <!-- This can help identify if it has introduced any breaking changes -->
- [ ] Did you add sufficient unit test and/or integration test coverage for this PR?
- If not, please explain why it is not required

References
----------
JIRA:
<!--
Copy&paste links: to Jira ticket, other PRs, issues, Slack conversations...
For code bumps: link to PR, tag or GitHub `/compare/master...master`
-->

Test & Review
------------
<!--
Has it been tested? how?
Copy&paste any handy instructions, steps or requirements that can save time to the reviewer or any reader.
-->

Open questions / Follow-ups
--------------------------
<!--
Optional: anything open to discussion for the reviewer, out of scope, or follow-ups.
-->

<!--
Review stakeholders
------------------
<!--
Optional: mention stakeholders or if special context that is required to review.
-->
2 changes: 1 addition & 1 deletion schemaregistry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func NewConfig(url string) *Config {
c.ConnectionTimeoutMs = 10000
c.RequestTimeoutMs = 10000

c.MaxRetries = 2
c.MaxRetries = 3
c.RetriesWaitMs = 1000
c.RetriesMaxWaitMs = 20000

Expand Down
2 changes: 1 addition & 1 deletion schemaregistry/internal/client_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (

// ClientConfig is used to pass multiple configuration options to the Schema Registry client.
type ClientConfig struct {
// SchemaRegistryURL determines the URL of Schema Registry.
// SchemaRegistryURL is a comma-space separated list of URLs for the Schema Registry.
SchemaRegistryURL string

// BasicAuthUserInfo specifies the user info in the form of {username}:{password}.
Expand Down
83 changes: 51 additions & 32 deletions schemaregistry/internal/rest_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func NewRequest(method string, endpoint string, body interface{}, arguments ...i

// RestService represents a REST client
type RestService struct {
url *url.URL
urls []*url.URL
headers http.Header
maxRetries int
retriesWaitMs int
Expand All @@ -124,21 +124,22 @@ type RestService struct {
// NewRestService returns a new REST client for the Confluent Schema Registry
func NewRestService(conf *ClientConfig) (*RestService, error) {
urlConf := conf.SchemaRegistryURL
u, err := url.Parse(urlConf)

if err != nil {
return nil, err
urlStrs := strings.Split(urlConf, ",")
urls := make([]*url.URL, len(urlStrs))
for i, urlStr := range urlStrs {
u, err := url.Parse(strings.TrimSpace(urlStr))
if err != nil {
return nil, err
}
urls[i] = u
}

headers, err := NewAuthHeader(u, conf)
headers, err := NewAuthHeader(urls[0], conf)
if err != nil {
return nil, err
}

headers.Add("Content-Type", "application/vnd.schemaregistry.v1+json")
if err != nil {
return nil, err
}

if conf.HTTPClient == nil {
transport, err := configureTransport(conf)
Expand All @@ -155,7 +156,7 @@ func NewRestService(conf *ClientConfig) (*RestService, error) {
}

return &RestService{
url: u,
urls: urls,
headers: headers,
maxRetries: conf.MaxRetries,
retriesWaitMs: conf.RetriesWaitMs,
Expand Down Expand Up @@ -337,19 +338,51 @@ func NewAuthHeader(service *url.URL, conf *ClientConfig) (http.Header, error) {
return header, err
}

// HandleRequest sends a HTTP(S) request to the Schema Registry, placing results into the response object
// HandleRequest sends a request to the Schema Registry, iterating over the list of URLs
func (rs *RestService) HandleRequest(request *API, response interface{}) error {
urlPath := path.Join(rs.url.Path, fmt.Sprintf(request.endpoint, request.arguments...))
endpoint, err := rs.url.Parse(urlPath)
if err != nil {
var resp *http.Response
var err error
for i, u := range rs.urls {
resp, err = rs.HandleHTTPRequest(u, request)
if err != nil {
if i == len(rs.urls)-1 {
return err
}
continue
}
if isSuccess(resp.StatusCode) || !isRetriable(resp.StatusCode) || i >= rs.maxRetries {
break
}
}
defer resp.Body.Close()
if isSuccess(resp.StatusCode) {
if err = json.NewDecoder(resp.Body).Decode(response); err != nil {
return err
}
return nil
}

var failure rest.Error
if err = json.NewDecoder(resp.Body).Decode(&failure); err != nil {
return err
}

return &failure
}

// HandleHTTPRequest sends a HTTP(S) request to the Schema Registry, placing results into the response object
func (rs *RestService) HandleHTTPRequest(url *url.URL, request *API) (*http.Response, error) {
urlPath := path.Join(url.Path, fmt.Sprintf(request.endpoint, request.arguments...))
endpoint, err := url.Parse(urlPath)
if err != nil {
return nil, err
}

var readCloser io.ReadCloser
if request.body != nil {
outbuf, err := json.Marshal(request.body)
if err != nil {
return err
return nil, err
}
readCloser = ioutil.NopCloser(bytes.NewBuffer(outbuf))
}
Expand All @@ -365,30 +398,16 @@ func (rs *RestService) HandleRequest(request *API, response interface{}) error {
for i := 0; i < rs.maxRetries+1; i++ {
resp, err = rs.Do(req)
if err != nil {
return err
return nil, err
}

if isSuccess(resp.StatusCode) || !isRetriable(resp.StatusCode) || i >= rs.maxRetries {
break
return resp, nil
}

time.Sleep(rs.fullJitter(i))
}

defer resp.Body.Close()
if resp.StatusCode == 200 {
if err = json.NewDecoder(resp.Body).Decode(response); err != nil {
return err
}
return nil
}

var failure rest.Error
if err := json.NewDecoder(resp.Body).Decode(&failure); err != nil {
return err
}

return &failure
return nil, fmt.Errorf("failed to send request after %d retries", rs.maxRetries)
}

func (rs *RestService) fullJitter(retriesAttempted int) time.Duration {
Expand Down
33 changes: 15 additions & 18 deletions schemaregistry/rules/cel/cel_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,18 @@ func init() {

// Register registers the CEL rule executor
func Register() {
serde.RegisterRuleExecutor(NewExecutor())
serde.RegisterRuleExecutor(NewFieldExecutor())
}

// NewExecutor creates a new CEL rule executor
func NewExecutor() serde.RuleExecutor {
env, _ := DefaultEnv()

e := &Executor{
return &Executor{
env: env,
cache: map[string]cel.Program{},
}
serde.RegisterRuleExecutor(e)

a := &serde.AbstractFieldRuleExecutor{}
f := &FieldExecutor{
AbstractFieldRuleExecutor: *a,
executor: Executor{
env: env,
cache: map[string]cel.Program{},
},
}
f.FieldRuleExecutor = f
serde.RegisterRuleExecutor(f)
}

// Executor is a CEL rule executor
Expand Down Expand Up @@ -199,19 +193,22 @@ func typeToCELType(arg interface{}) *cel.Type {

func (c *Executor) newProgram(expr string, msg interface{}, decls []cel.EnvOption) (cel.Program, error) {
typ := reflect.TypeOf(msg)
if typ.Kind() == reflect.Pointer {
if typ.Kind() == reflect.Pointer || typ.Kind() == reflect.Interface {
typ = typ.Elem()
}
protoType, ok := msg.(proto.Message)
var declType cel.EnvOption
if ok {
declType = cel.Types(protoType)
} else {
} else if typ.Kind() == reflect.Struct {
declType = ext.NativeTypes(typ)
}
envOptions := make([]cel.EnvOption, len(decls))
copy(envOptions, decls)
envOptions = append(envOptions, declType)
envOptions := decls
if declType != nil {
envOptions = make([]cel.EnvOption, len(decls))
copy(envOptions, decls)
envOptions = append(envOptions, declType)
}
env, err := c.env.Extend(envOptions...)
if err != nil {
return nil, err
Expand Down
17 changes: 17 additions & 0 deletions schemaregistry/rules/cel/cel_field_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,25 @@ package cel
import (
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/serde"
"github.com/google/cel-go/cel"
)

// NewFieldExecutor creates a new CEL field rule executor
func NewFieldExecutor() serde.RuleExecutor {
env, _ := DefaultEnv()

a := &serde.AbstractFieldRuleExecutor{}
f := &FieldExecutor{
AbstractFieldRuleExecutor: *a,
executor: Executor{
env: env,
cache: map[string]cel.Program{},
},
}
f.FieldRuleExecutor = f
return f
}

// FieldExecutor is a CEL field rule executor
type FieldExecutor struct {
serde.AbstractFieldRuleExecutor
Expand Down
4 changes: 2 additions & 2 deletions schemaregistry/rules/encryption/awskms/aws_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewClient(keyURI string, creds aws.CredentialsProvider) (registry.KMSClient
// Supported returns true if keyURI starts with the URI prefix provided when
// creating the client.
func (c *awsClient) Supported(keyURI string) bool {
return strings.HasPrefix(keyURI, prefix)
return strings.HasPrefix(keyURI, c.keyURI)
}

// GetAEAD returns an implementation of the AEAD interface which performs
Expand All @@ -64,7 +64,7 @@ func (c *awsClient) Supported(keyURI string) bool {
// See https://docs.aws.amazon.com/IAM/latest/UserGuide/reference-arns.html
func (c *awsClient) GetAEAD(keyURI string) (tink.AEAD, error) {
if !c.Supported(keyURI) {
return nil, fmt.Errorf("keyURI must start with prefix %s, but got %s", prefix, keyURI)
return nil, fmt.Errorf("keyURI must start with prefix %s, but got %s", c.keyURI, keyURI)
}
uri := strings.TrimPrefix(keyURI, prefix)
return NewAEAD(uri, c.creds)
Expand Down
62 changes: 57 additions & 5 deletions schemaregistry/rules/encryption/awskms/aws_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,26 @@
package awskms

import (
"context"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/credentials/stscreds"
"github.com/aws/aws-sdk-go-v2/service/sts"
"github.com/confluentinc/confluent-kafka-go/v2/schemaregistry/rules/encryption"
"github.com/tink-crypto/tink-go/v2/core/registry"
"os"
"strings"
)

const (
prefix = "aws-kms://"
accessKeyID = "access.key.id"
secretAccessKey = "secret.access.key"
profile = "profile"
roleArn = "role.arn"
roleSessionName = "role.session.name"
roleExternalID = "role.external.id"
)

func init() {
Expand All @@ -51,13 +61,55 @@ func (l *awsDriver) NewKMSClient(conf map[string]string, keyURL *string) (regist
if keyURL != nil {
uriPrefix = *keyURL
}
arn := conf[roleArn]
if arn == "" {
arn = os.Getenv("AWS_ROLE_ARN")
}
sessionName := conf[roleSessionName]
if sessionName == "" {
sessionName = os.Getenv("AWS_ROLE_SESSION_NAME")
}
externalID := conf[roleExternalID]
if externalID == "" {
externalID = os.Getenv("AWS_ROLE_EXTERNAL_ID")
}
var creds aws.CredentialsProvider
key, ok := conf[accessKeyID]
if ok {
secret, ok := conf[secretAccessKey]
if ok {
creds = credentials.NewStaticCredentialsProvider(key, secret, "")
key := conf[accessKeyID]
secret := conf[secretAccessKey]
sourceProfile := conf[profile]
if key != "" && secret != "" {
creds = credentials.NewStaticCredentialsProvider(key, secret, "")
} else if sourceProfile != "" {
cfg, err := config.LoadDefaultConfig(context.Background(),
config.WithSharedConfigProfile(sourceProfile),
)
if err != nil {
return nil, err
}
creds = cfg.Credentials
}
if arn != "" {
region, err := getRegion(strings.TrimPrefix(uriPrefix, prefix))
if err != nil {
return nil, err
}
stsSvc := sts.New(sts.Options{
Credentials: creds,
Region: region,
})
if sessionName == "" {
sessionName = "confluent-encrypt"
}
var extID *string
if externalID != "" {
extID = &externalID
}
creds = stscreds.NewAssumeRoleProvider(stsSvc, arn, func(o *stscreds.AssumeRoleOptions) {
o.RoleSessionName = sessionName
o.ExternalID = extID
})
creds = aws.NewCredentialsCache(creds)
}

return NewClient(uriPrefix, creds)
}
Loading

0 comments on commit cf9ef1c

Please sign in to comment.