Skip to content

Commit

Permalink
Enable selective schema caching (#151)
Browse files Browse the repository at this point in the history
* Enable selective schema caching
* Use uniform variable naming
  • Loading branch information
mostafa authored Aug 12, 2022
1 parent 20859bc commit 239f243
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 41 deletions.
14 changes: 8 additions & 6 deletions module.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,10 @@ func init() {

type (
Kafka struct {
vu modules.VU
metrics kafkaMetrics
exports *goja.Object
vu modules.VU
metrics kafkaMetrics
exports *goja.Object
schemaCache map[string]*Schema
}
RootModule struct{}
Module struct {
Expand Down Expand Up @@ -97,9 +98,10 @@ func (*RootModule) NewModuleInstance(virtualUser modules.VU) modules.Instance {
// Create a new Kafka module.
moduleInstance := &Module{
Kafka: &Kafka{
vu: virtualUser,
metrics: metrics,
exports: runtime.NewObject(),
vu: virtualUser,
metrics: metrics,
exports: runtime.NewObject(),
schemaCache: make(map[string]*Schema),
},
}

Expand Down
94 changes: 59 additions & 35 deletions schema_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ type BasicAuth struct {
}

type SchemaRegistryConfig struct {
URL string `json:"url"`
BasicAuth BasicAuth `json:"basicAuth"`
TLS TLSConfig `json:"tls"`
EnableCaching bool `json:"enableCaching"`
URL string `json:"url"`
BasicAuth BasicAuth `json:"basicAuth"`
TLS TLSConfig `json:"tls"`
}

const (
Expand All @@ -42,14 +43,15 @@ const (
// Schema is a wrapper around the schema registry schema.
// The Codec() and JsonSchema() methods will return the respective codecs (duck-typing).
type Schema struct {
ID int `json:"id"`
Schema string `json:"schema"`
SchemaType *srclient.SchemaType `json:"schemaType"`
Version int `json:"version"`
References []srclient.Reference `json:"references"`
Subject string `json:"subject"`
codec *goavro.Codec
jsonSchema *jsonschema.Schema
EnableCaching bool `json:"enableCaching"`
ID int `json:"id"`
Schema string `json:"schema"`
SchemaType *srclient.SchemaType `json:"schemaType"`
Version int `json:"version"`
References []srclient.Reference `json:"references"`
Subject string `json:"subject"`
codec *goavro.Codec
jsonSchema *jsonschema.Schema
}

type SubjectNameConfig struct {
Expand Down Expand Up @@ -238,18 +240,17 @@ func (k *Kafka) schemaRegistryClientClass(call goja.ConstructorCall) *goja.Objec

// schemaRegistryClient creates a schemaRegistryClient instance
// with the given configuration. It will also configure auth and TLS credentials if exists.
func (k *Kafka) schemaRegistryClient(
configuration *SchemaRegistryConfig) *srclient.SchemaRegistryClient {
func (k *Kafka) schemaRegistryClient(config *SchemaRegistryConfig) *srclient.SchemaRegistryClient {
runtime := k.vu.Runtime()
var srClient *srclient.SchemaRegistryClient

tlsConfig, err := GetTLSConfig(configuration.TLS)
tlsConfig, err := GetTLSConfig(config.TLS)
if err != nil {
// Ignore the error if we're not using TLS
if err.Code != noTLSConfig {
common.Throw(runtime, err)
}
srClient = srclient.CreateSchemaRegistryClient(configuration.URL)
srClient = srclient.CreateSchemaRegistryClient(config.URL)
}

if tlsConfig != nil {
Expand All @@ -259,18 +260,29 @@ func (k *Kafka) schemaRegistryClient(
},
}
srClient = srclient.CreateSchemaRegistryClientWithOptions(
configuration.URL, httpClient, ConcurrentRequests)
config.URL, httpClient, ConcurrentRequests)
}

if configuration.BasicAuth.Username != "" && configuration.BasicAuth.Password != "" {
srClient.SetCredentials(configuration.BasicAuth.Username, configuration.BasicAuth.Password)
if config.BasicAuth.Username != "" && config.BasicAuth.Password != "" {
srClient.SetCredentials(config.BasicAuth.Username, config.BasicAuth.Password)
}

// The default value for a boolean is false, so the caching
// feature of the srclient package will be disabled.
srClient.CachingEnabled(config.EnableCaching)

return srClient
}

// getSchema returns the schema for the given subject and schema ID and version.
func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema) *Schema {
// If EnableCache is set, check if the schema is in the cache.
if schema.EnableCaching {
if schema, ok := k.schemaCache[schema.Subject]; ok {
return schema
}
}

runtime := k.vu.Runtime()
// The client always caches the schema.
var schemaInfo *srclient.Schema
Expand All @@ -282,20 +294,27 @@ func (k *Kafka) getSchema(client *srclient.SchemaRegistryClient, schema *Schema)
schemaInfo, err = client.GetSchemaByVersion(
schema.Subject, schema.Version)
}
if err != nil {

if err == nil {
wrappedSchema := &Schema{
EnableCaching: schema.EnableCaching,
ID: schemaInfo.ID(),
Version: schemaInfo.Version(),
Schema: schemaInfo.Schema(),
SchemaType: schemaInfo.SchemaType(),
References: schemaInfo.References(),
Subject: schema.Subject,
}
// If the Cache is set, cache the schema.
if wrappedSchema.EnableCaching {
k.schemaCache[wrappedSchema.Subject] = wrappedSchema
}
return wrappedSchema
} else {
err := NewXk6KafkaError(schemaNotFound, "Failed to get schema from schema registry", err)
common.Throw(runtime, err)
return nil
}

return &Schema{
ID: schemaInfo.ID(),
Version: schemaInfo.Version(),
Schema: schemaInfo.Schema(),
SchemaType: schemaInfo.SchemaType(),
References: schemaInfo.References(),
Subject: schema.Subject,
}
}

// createSchema creates a new schema in the schema registry.
Expand All @@ -312,14 +331,19 @@ func (k *Kafka) createSchema(client *srclient.SchemaRegistryClient, schema *Sche
return nil
}

return &Schema{
ID: schemaInfo.ID(),
Version: schemaInfo.Version(),
Schema: schemaInfo.Schema(),
SchemaType: schemaInfo.SchemaType(),
References: schemaInfo.References(),
Subject: schema.Subject,
wrappedSchema := &Schema{
EnableCaching: schema.EnableCaching,
ID: schemaInfo.ID(),
Version: schemaInfo.Version(),
Schema: schemaInfo.Schema(),
SchemaType: schemaInfo.SchemaType(),
References: schemaInfo.References(),
Subject: schema.Subject,
}
if schema.EnableCaching {
k.schemaCache[schema.Subject] = wrappedSchema
}
return wrappedSchema
}

// getSubjectName returns the subject name for the given schema and topic.
Expand Down

0 comments on commit 239f243

Please sign in to comment.