Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Problem with schema registry cache- Workaround #146

Closed
oscar067 opened this issue Aug 5, 2022 · 5 comments · Fixed by #151
Closed

Problem with schema registry cache- Workaround #146

oscar067 opened this issue Aug 5, 2022 · 5 comments · Fixed by #151
Labels
🐛 Bug Something isn't working

Comments

@oscar067
Copy link

oscar067 commented Aug 5, 2022

Hi,

I realized a very low number of messages produced when using avro with schema registry with 30 vu (less than 100 per second):

My config looks like

  writer.produce({
    messages:messages,
    //this option generate magic number error valueSchema:valueSchema ,
    config:kafkaConfiguration
  })
}

where kafkaConfiguration is:

export const kafkaConfiguration = {
  consumer: {
    keyDeserializer: STRING_DESERIALIZER,
    valueDeserializer: AVRO_DESERIALIZER,
  },
  producer: {
    keySerializer: STRING_SERIALIZER,
    valueSerializer: AVRO_SERIALIZER,
  },
  schemaRegistry: {
    url: schemaRegistry,
    tls: {
      enableTls: enableTlsSchemaRegistry,
      insecureSkipTlsVerify: true,
      clientCertPem: "/certs/cert.pem",
      clientKeyPem:  "/certs/cert.key",
      serverCaPem:   "/certs/rootg3_b64.cer",
    },
  },
};

Taking a look to the library github.com/riferrei/srclient v0.5.4 I realized this behaviour in the cache of schemaRegistyClient.go:

func (client *SchemaRegistryClient) getVersion(subject string, version string) (*Schema, error) {

	if client.getCachingEnabled() {
		cacheKey := cacheKey(subject, version)
		client.subjectSchemaCacheLock.RLock()
		cachedResult := client.subjectSchemaCache[cacheKey]
		client.subjectSchemaCacheLock.RUnlock()
		if cachedResult != nil {
			return cachedResult, nil
		}
	}

RLock has this doc:


// RLock locks rw for reading.
//
// It should not be used for recursive read locking; a blocked Lock
// call excludes new readers from acquiring the lock. See the
// documentation on the RWMutex type. 

So I decided to modify a bit schema_registry.go like this and now woks with more than 2000 per second with the same 30vu

var cache = make(map[string]*srclient.Schema)

// GetSchema returns the schema for the given subject and schema ID and version.
func GetSchema(
	client *srclient.SchemaRegistryClient, subject string, schema string, schemaType srclient.SchemaType, version int,
) (*srclient.Schema, *Xk6KafkaError) {
	// The client always caches the schema.
	var schemaInfo *srclient.Schema
	var err error
	// Default version of the schema is the latest version.

	value, isMapContainsKey := cache[subject]

	if isMapContainsKey {
		return value, nil
	}

	if version == 0 {
		schemaInfo, err = client.GetLatestSchema(subject)
	} else {
		schemaInfo, err = client.GetSchemaByVersion(subject, version)
	}

	if err == nil {
		cache[subject] = schemaInfo
	}

	if err != nil {
		return nil, NewXk6KafkaError(schemaNotFound,
			"Failed to get schema from schema registry", err)
	}

	return schemaInfo, nil
}

I tested it in local

@mostafa
Copy link
Owner

mostafa commented Aug 5, 2022

Hey @oscar067,

I am refactoring this feature in this branch completely, and removing many of these nuisances. It's still WIP. These issues will be addressed by my future changes:

@oscar067
Copy link
Author

oscar067 commented Aug 5, 2022

I was checking those future changes, but at some point, if tests makes use of the schema registry cache, the problem still be there.

Solution can be to github.com/riferrei/srclient v0.5.4 changing it's implemetation or a similar , to the proposed, cache mechanims is in place.

Other exceptions like these ones were in place when running the original code (solved after the patch)

time="2022-08-05T10:44:10Z" level=warning msg="Failed to create or get schema, manually encoding the data" error="Failed to get schema from schema registry, OriginalError: %!w(*url.Error=&{Get

@mostafa
Copy link
Owner

mostafa commented Aug 6, 2022

Interesting! Maybe we should ask the author of the original package for his opinion here?

@riferrei
What do you think about this? Should this change land in your library instead?

@riferrei
Copy link

riferrei commented Aug 9, 2022

So what is the proposal here? Remove the usage of the caching from the library?

Apropos, it is possible to programmatically disable the caching, so all interactions with SR happen without any locks. That way you can remove it from the critical path. That said; it is a bit weird having the srclient in the critical path of highly concurrently applications. Ideally, the schema should be managed before any interaction with producers/consumers and then once the schema is fetched — it is used within the concurrent transactions.

Let me know how can I help. I welcome changes in the project. If none of this help, we can think of a PR to change the caching behavior. 🙂

@riferrei

@oscar067
Copy link
Author

oscar067 commented Aug 10, 2022

Cached in is needed in github.com/riferrei/srclient, makes no sense in a performance test call the schema registry everytime a message is generated.

I'll try to test the code of the schemaRegistyClient.go with https://pkg.go.dev/sync#Map, and let you know the results, If not sucessfull for the xk6 kafka plugin I would recommend the workaround, and warming up the producer in the setup method of the test.

"The Map type is optimized for two common use cases: (1) when the entry for a given key is only ever written once but read many times, as in caches that only grow, or (2) when multiple goroutines read, write, and overwrite entries for disjoint sets of keys. In these two cases, use of a Map may significantly reduce lock contention compared to a Go map paired with a separate Mutex or RWMutex."

Many thanks fro the answer

@mostafa mostafa added the 🐛 Bug Something isn't working label Aug 10, 2022
@mostafa mostafa moved this to Todo in xk6-kafka Aug 10, 2022
@mostafa mostafa moved this from Todo to Doing in xk6-kafka Aug 10, 2022
This was linked to pull requests Aug 10, 2022
Repository owner moved this from Doing to Test in xk6-kafka Aug 12, 2022
@mostafa mostafa moved this from Test to Release in xk6-kafka Aug 14, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
🐛 Bug Something isn't working
Projects
Status: Release
Development

Successfully merging a pull request may close this issue.

3 participants