From e32152302c68582ab323c8e3693349d57a95e318 Mon Sep 17 00:00:00 2001 From: Nick Whelan Date: Tue, 18 Apr 2023 12:20:20 -0500 Subject: [PATCH 1/3] Fetch latest version for references, if not specified **Story:** As a developer using Confluent's schema registry, I want to always have a remote reference pointing at the latest remote version. There are cases where the version is unknown, or where it drifts between environments. By simply using the latest version, a large burden of managing versions across environments in our build file is lifted. As a developer using this plugin, I don't want to necessarily specify the remote schema version when specifying my references. I need a method of communicating this in my definition(s). **Implementation** A new method has been added to the Subject class, addReference(name: String, subject:String). When a reference is not specified with a verion, the compatibility and register tasks will fetch the latest version from the configured schema registry. --- README.md | 10 ++ .../github/imflog/schema/registry/Subject.kt | 5 + .../compatibility/CompatibilityTaskAction.kt | 2 + .../tasks/register/RegisterTaskAction.kt | 7 +- .../support/ReferenceCurrentVersionUtil.kt | 38 +++++ .../registry/SchemaRegistryPluginTest.kt | 30 ++-- .../registry/parser/AvroSchemaParserTest.kt | 19 +-- .../registry/parser/JsonSchemaParserTest.kt | 19 +-- .../CompatibilityTaskActionTest.kt | 137 ++++++++++++++---- .../tasks/download/DownloadTaskActionTest.kt | 72 +++++---- .../tasks/register/RegisterTaskActionTest.kt | 126 ++++++++++++---- 11 files changed, 326 insertions(+), 139 deletions(-) create mode 100644 src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt diff --git a/README.md b/README.md index 09ca90c..dfdc567 100644 --- a/README.md +++ b/README.md @@ -207,6 +207,16 @@ if you need information about the registered id. } } ``` +* For remote references, if no version is specified, the latest version of schema is fetched and used from the schema registry: +```groovy +schemaRegistry { + url = 'http://registry-url:8081' + register { + subject('avroWithRemoteReferences', '/absolutPath/dependent/path.avsc', "AVRO") + .addReference('avroSubject', 'avroSubjectType') + } +} +``` #### Avro Mixing local and remote references is perfectly fine for Avro without specific configurations. diff --git a/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt b/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt index f6038a4..6f1d284 100644 --- a/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt +++ b/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt @@ -15,6 +15,11 @@ data class Subject( return this } + fun addReference(name: String, subject:String): Subject { + references.add(SchemaReference(name, subject, 0)) + return this + } + fun addLocalReference(name: String, path: String): Subject { localReferences.add(LocalReference(name, path)) return this diff --git a/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt b/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt index ac65915..1ce9cd6 100644 --- a/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt +++ b/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt @@ -3,6 +3,7 @@ package com.github.imflog.schema.registry.tasks.compatibility import com.github.imflog.schema.registry.LoggingUtils.infoIfNotQuiet import com.github.imflog.schema.registry.Subject import com.github.imflog.schema.registry.parser.SchemaParser +import com.github.imflog.schema.registry.tasks.support.ReferenceCurrentVersionUtil.updateNonPositiveReferencesToCurrentVersion import com.github.imflog.schema.registry.toSchemaType import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException @@ -23,6 +24,7 @@ class CompatibilityTaskAction( var errorCount = 0 for ((subject, path, type, remoteReferences, localReferences) in subjects) { logger.debug("Loading schema for subject($subject) from $path.") + updateNonPositiveReferencesToCurrentVersion(client, remoteReferences) val isCompatible = try { val parsedSchema = SchemaParser .provide(type.toSchemaType(), client, rootDir) diff --git a/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt b/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt index 48bdd49..6f04934 100644 --- a/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt +++ b/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt @@ -5,6 +5,8 @@ import com.github.imflog.schema.registry.LoggingUtils.infoIfNotQuiet import com.github.imflog.schema.registry.SchemaType import com.github.imflog.schema.registry.Subject import com.github.imflog.schema.registry.parser.SchemaParser +import com.github.imflog.schema.registry.tasks.support.ReferenceCurrentVersionUtil +import com.github.imflog.schema.registry.tasks.support.ReferenceCurrentVersionUtil.updateNonPositiveReferencesToCurrentVersion import com.github.imflog.schema.registry.toSchemaType import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference @@ -27,8 +29,11 @@ class RegisterTaskAction( fun run(): Int { var errorCount = 0 writeOutputFileHeader() - subjects.forEach { (subject, path, type, references, localReferences) -> + subjects.forEach { (subject, path, type, references: List, localReferences) -> try { + updateNonPositiveReferencesToCurrentVersion( + client, references + ) val schemaId = registerSchema(subject, path, type.toSchemaType(), references, localReferences) writeRegisteredSchemaOutput(subject, path, schemaId) } catch (e: Exception) { diff --git a/src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt b/src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt new file mode 100644 index 0000000..91096c5 --- /dev/null +++ b/src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt @@ -0,0 +1,38 @@ +package com.github.imflog.schema.registry.tasks.support + +import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient +import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference +import org.gradle.api.GradleScriptException +import org.gradle.api.logging.Logging + +object ReferenceCurrentVersionUtil { + private val logger = Logging.getLogger(ReferenceCurrentVersionUtil::class.java) + + fun updateNonPositiveReferencesToCurrentVersion( + client: SchemaRegistryClient, + references: List + ) { + references.forEach { + if (it.version <= 0) { + it.version = getReferenceCurrentVersion(client, it) + } + } + } + + private fun getReferenceCurrentVersion( + client: SchemaRegistryClient, + reference: SchemaReference + ): Int { + logger.debug("Fetching latest remote version for '$reference'") + + val schemas = client.getSchemas(reference.subject, false, true); + if (schemas.size == 0) { + throw GradleScriptException("Did not find any schemas with the subject '${reference.subject}", Throwable()) + } + if (schemas.size > 1) { + throw GradleScriptException("Found more than one schema with the subject prefix '${reference.subject}.", Throwable()) + } + + return client.getVersion(reference.subject, schemas[0]) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/github/imflog/schema/registry/SchemaRegistryPluginTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/SchemaRegistryPluginTest.kt index ce2dc35..e05f3ef 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/SchemaRegistryPluginTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/SchemaRegistryPluginTest.kt @@ -3,32 +3,29 @@ package com.github.imflog.schema.registry import com.github.imflog.schema.registry.tasks.download.DownloadTask import org.assertj.core.api.Assertions import org.gradle.api.Project -import org.gradle.internal.impldep.org.junit.rules.TemporaryFolder import org.gradle.testfixtures.ProjectBuilder import org.gradle.testkit.runner.GradleRunner import org.gradle.testkit.runner.UnexpectedBuildFailure -import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir import java.io.File +import java.nio.file.Files +import java.nio.file.Path class SchemaRegistryPluginTest { lateinit var project: Project - lateinit var folderRule: TemporaryFolder + @TempDir + lateinit var folderRule: Path lateinit var buildFile: File private val subject = "test-subject" @BeforeEach fun init() { - folderRule = TemporaryFolder() project = ProjectBuilder.builder().build() project.pluginManager.apply(SchemaRegistryPlugin::class.java) - } - - @AfterEach - fun tearDown() { - folderRule.delete() + Files.createFile(folderRule.resolve("build.gradle")) } @Test @@ -41,8 +38,7 @@ class SchemaRegistryPluginTest { @Test fun `plugin should fail with wrong url extension configuration`() { - folderRule.create() - buildFile = folderRule.newFile("build.gradle") + buildFile = File(folderRule.toFile(), "build.gradle") buildFile.writeText( """ plugins { @@ -61,7 +57,7 @@ class SchemaRegistryPluginTest { try { GradleRunner.create() .withGradleVersion("6.7.1") - .withProjectDir(folderRule.root) + .withProjectDir(folderRule.toFile()) .withArguments(DownloadTask.TASK_NAME) .withPluginClasspath() .withDebug(true) @@ -74,8 +70,7 @@ class SchemaRegistryPluginTest { @Test fun `plugin should fail with wrong credentials extension configuration`() { - folderRule.create() - buildFile = folderRule.newFile("build.gradle") + buildFile = File(folderRule.toFile(), "build.gradle") buildFile.writeText( """ plugins { @@ -99,7 +94,7 @@ class SchemaRegistryPluginTest { try { GradleRunner.create() .withGradleVersion("6.7.1") - .withProjectDir(folderRule.root) + .withProjectDir(folderRule.toFile()) .withArguments(DownloadTask.TASK_NAME) .withPluginClasspath() .withDebug(true) @@ -112,8 +107,7 @@ class SchemaRegistryPluginTest { @Test fun `plugin should only parse nested extensions`() { - folderRule.create() - buildFile = folderRule.newFile("build.gradle") + buildFile = File(folderRule.toFile(), "build.gradle") buildFile.writeText( """ plugins { @@ -136,7 +130,7 @@ class SchemaRegistryPluginTest { try { GradleRunner.create() .withGradleVersion("6.7.1") - .withProjectDir(folderRule.root) + .withProjectDir(folderRule.toFile()) .withArguments(DownloadTask.TASK_NAME) .withPluginClasspath() .withDebug(true) diff --git a/src/test/kotlin/com/github/imflog/schema/registry/parser/AvroSchemaParserTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/parser/AvroSchemaParserTest.kt index ba075c7..664abb4 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/parser/AvroSchemaParserTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/parser/AvroSchemaParserTest.kt @@ -3,21 +3,18 @@ package com.github.imflog.schema.registry.parser import com.github.imflog.schema.registry.LocalReference import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.assertj.core.api.Assertions -import org.gradle.internal.impldep.org.junit.rules.TemporaryFolder import org.json.JSONObject -import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.io.TempDir +import java.nio.file.Path @TestInstance(TestInstance.Lifecycle.PER_CLASS) class AvroSchemaParserTest { private val schemaRegistryClient = MockSchemaRegistryClient() - private val folderRule: TemporaryFolder = TemporaryFolder().apply { create() } - private val parser = AvroSchemaParser( - schemaRegistryClient, - folderRule.root - ) + @TempDir + lateinit var folderRule: Path companion object { private const val ADDRESS_REFERENCE_NAME = "Address" @@ -39,14 +36,10 @@ class AvroSchemaParserTest { }""" } - @AfterAll - fun tearDown() { - folderRule.delete() - } - @Test fun `Should format local references correctly`() { // Given + val parser = AvroSchemaParser(schemaRegistryClient, folderRule.toFile()) val aLocalReference = givenALocalReference() // When @@ -61,7 +54,7 @@ class AvroSchemaParserTest { } private fun givenALocalReference(): LocalReference { - val addressLocalFile = folderRule.root.resolve("Address.avsc") + val addressLocalFile = folderRule.resolve("Address.avsc").toFile() addressLocalFile.writeText(ADDRESS_SCHEMA) return LocalReference(ADDRESS_REFERENCE_NAME, addressLocalFile.path) } diff --git a/src/test/kotlin/com/github/imflog/schema/registry/parser/JsonSchemaParserTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/parser/JsonSchemaParserTest.kt index 045efc9..819e52f 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/parser/JsonSchemaParserTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/parser/JsonSchemaParserTest.kt @@ -3,21 +3,18 @@ package com.github.imflog.schema.registry.parser import com.github.imflog.schema.registry.LocalReference import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import org.assertj.core.api.Assertions -import org.gradle.internal.impldep.org.junit.rules.TemporaryFolder import org.json.JSONObject -import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.Test import org.junit.jupiter.api.TestInstance +import org.junit.jupiter.api.io.TempDir +import java.nio.file.Path @TestInstance(TestInstance.Lifecycle.PER_CLASS) class JsonSchemaParserTest { private val schemaRegistryClient = MockSchemaRegistryClient() - private val folderRule: TemporaryFolder = TemporaryFolder().apply { create() } - private val parser = JsonSchemaParser( - schemaRegistryClient, - folderRule.root - ) + @TempDir + lateinit var folderRule: Path companion object { private const val ADDRESS_REFERENCE_NAME = "Address" @@ -43,14 +40,10 @@ class JsonSchemaParserTest { }""" } - @AfterAll - fun tearDown() { - folderRule.delete() - } - @Test fun `Should format local references correctly`() { // Given + val parser = JsonSchemaParser(schemaRegistryClient, folderRule.toFile()) val aLocalReference = givenALocalReference() // When @@ -65,7 +58,7 @@ class JsonSchemaParserTest { } private fun givenALocalReference(): LocalReference { - val addressLocalFile = folderRule.root.resolve("Address.json") + val addressLocalFile = folderRule.resolve("Address.json").toFile() addressLocalFile.writeText(ADDRESS_SCHEMA) return LocalReference(ADDRESS_REFERENCE_NAME, addressLocalFile.path) } diff --git a/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt index fd4aefd..a175992 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt @@ -10,25 +10,22 @@ import io.confluent.kafka.schemaregistry.rest.exceptions.Errors import io.mockk.every import io.mockk.spyk import org.assertj.core.api.Assertions -import org.gradle.internal.impldep.org.junit.rules.TemporaryFolder -import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir import java.io.File +import java.nio.file.Files +import java.nio.file.Path class CompatibilityTaskActionTest { - lateinit var folderRule: TemporaryFolder + @TempDir + lateinit var folderRule: Path @BeforeEach - fun setUp() { - folderRule = TemporaryFolder() - folderRule.create() - } - - @AfterEach - fun tearDown() { - folderRule.delete() + fun init() { + Files.createDirectories(folderRule.resolve("src/main/avro/external/")) + Files.createFile(folderRule.resolve("src/main/avro/external/test.avsc")) } @Test @@ -49,11 +46,10 @@ class CompatibilityTaskActionTest { listOf() ).get() ) - folderRule.newFolder("src", "main", "avro", "external") val subjects = arrayListOf(Subject("test", "src/main/avro/external/test.avsc", "AVRO")) - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -68,7 +64,7 @@ class CompatibilityTaskActionTest { // when val errorCount = CompatibilityTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), subjects ).run() @@ -127,8 +123,7 @@ class CompatibilityTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -153,7 +148,91 @@ class CompatibilityTaskActionTest { // when val errorCount = CompatibilityTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), + subjects + ).run() + + // then + Assertions.assertThat(errorCount).isEqualTo(0) + } + + @Test + fun `Should verify compatibility when fetching latest version of references`() { + // given + val registryClient = MockSchemaRegistryClient() + registryClient.register( + "test", + registryClient.parseSchema( + AvroSchema.TYPE, + """{ + "type": "record", + "name": "test", + "fields": [ + { "name": "name", "type": "string" } + ] + }""", + listOf() + ).get() + ) + + // Register dependency + registryClient.register( + "Street", + registryClient.parseSchema( + AvroSchema.TYPE, + """{ + "type": "record", + "name": "Street", + "fields": [ + {"name": "street", "type": "string" } + ] + }""", + listOf() + ).get() + ) + + registryClient.register( + "Address", + registryClient.parseSchema( + AvroSchema.TYPE, + """{ + "type": "record", + "name": "Address", + "fields": [ + {"name": "city", "type": "string" }, + {"name": "street", "type": "Street" } + ] + }""", + listOf(SchemaReference("Street", "Street", 1)) + ).get() + ) + + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( + """ + {"type": "record", + "name": "test", + "fields": [ + {"name": "name", "type": "string" }, + {"name": "address", "type": [ "null", "Address" ], "default": null} + ] + } + """ + ) + + val subjects = listOf( + Subject( + "test", + "src/main/avro/external/test.avsc", + "AVRO" + ) + .addReference("Address", "Address") + .addReference("Street", "Street") + ) + + // when + val errorCount = CompatibilityTaskAction( + registryClient, + folderRule.toFile(), subjects ).run() @@ -197,8 +276,11 @@ class CompatibilityTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "local") - File(folderRule.root, "src/main/avro/local/country.avsc").writeText( + Files.createDirectories(folderRule.resolve("src/main/avro/local")) + Files.createFile(folderRule.resolve("src/main/avro/local/country.avsc")) + Files.createFile(folderRule.resolve("src/main/avro/local/address.avsc")) + + File(folderRule.toFile(), "src/main/avro/local/country.avsc").writeText( """{ "type": "enum", "name": "Country", @@ -214,7 +296,7 @@ class CompatibilityTaskActionTest { "default": "UNKNOWN" }""" ) - File(folderRule.root, "src/main/avro/local/address.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/local/address.avsc").writeText( """{ "type": "record", "name": "Address", @@ -226,7 +308,7 @@ class CompatibilityTaskActionTest { }""" ) - File(folderRule.root, "src/main/avro/local/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/local/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -252,7 +334,7 @@ class CompatibilityTaskActionTest { // when val errorCount = CompatibilityTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), subjects ).run() @@ -278,11 +360,10 @@ class CompatibilityTaskActionTest { listOf() ).get() ) - folderRule.newFolder("src", "main", "avro", "external") val subjects = arrayListOf(Subject("test", "src/main/avro/external/test.avsc", "AVRO")) - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -296,7 +377,7 @@ class CompatibilityTaskActionTest { // when val errorCount = CompatibilityTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), subjects, ).run() @@ -313,8 +394,6 @@ class CompatibilityTaskActionTest { spySchemaRegistry.testCompatibility(any(), any()) } throws RestClientException("Subject not found", 404, Errors.SUBJECT_NOT_FOUND_ERROR_CODE) - folderRule.newFolder("src", "main", "avro", "external") - val subjects = listOf( Subject( "test", @@ -322,7 +401,7 @@ class CompatibilityTaskActionTest { "AVRO" ) ) - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -336,7 +415,7 @@ class CompatibilityTaskActionTest { // when val errorCount = CompatibilityTaskAction( spySchemaRegistry, - folderRule.root, + folderRule.toFile(), subjects, ).run() diff --git a/src/test/kotlin/com/github/imflog/schema/registry/tasks/download/DownloadTaskActionTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/tasks/download/DownloadTaskActionTest.kt index 7993cdc..8a73e05 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/tasks/download/DownloadTaskActionTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/tasks/download/DownloadTaskActionTest.kt @@ -6,25 +6,14 @@ import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider import org.assertj.core.api.Assertions -import org.gradle.internal.impldep.org.junit.rules.TemporaryFolder -import org.junit.jupiter.api.AfterEach -import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir import java.io.File +import java.nio.file.Path class DownloadTaskActionTest { - lateinit var folderRule: TemporaryFolder - - @BeforeEach - fun setUp() { - folderRule = TemporaryFolder() - folderRule.create() - } - - @AfterEach - fun tearDown() { - folderRule.delete() - } + @TempDir + lateinit var folderRule: Path @Test fun `Should download schemas`() { @@ -63,12 +52,13 @@ class DownloadTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") + folderRule.resolve("src/main/avro/external").toFile().mkdir() + val folderRoot = folderRule.toFile() // when val errorCount = DownloadTaskAction( registryClient, - folderRule.root, + folderRoot, arrayListOf( DownloadSubject(testSubject, outputDir), DownloadSubject(fooSubject, outputDir) @@ -77,11 +67,11 @@ class DownloadTaskActionTest { // then Assertions.assertThat(errorCount).isEqualTo(0) - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc")).isNotNull - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc")).isNotNull + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc").readText()) .containsIgnoringCase("test") - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/foo.avsc")).isNotNull - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/foo.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/foo.avsc")).isNotNull + Assertions.assertThat(File(folderRoot, "src/main/avro/external/foo.avsc").readText()) .containsIgnoringCase("foo") } @@ -135,12 +125,13 @@ class DownloadTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") + folderRule.resolve("src/main/avro/external").toFile().mkdir() + val folderRoot = folderRule.toFile() // when val errorCount = DownloadTaskAction( registryClient, - folderRule.root, + folderRoot, arrayListOf( DownloadSubject("te.*", outputDir, null, true) ), @@ -148,13 +139,13 @@ class DownloadTaskActionTest { // then Assertions.assertThat(errorCount).isEqualTo(0) - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc")).exists() - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc")).exists() + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc").readText()) .containsIgnoringCase("test") - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/tea.avsc")).exists() - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/tea.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/tea.avsc")).exists() + Assertions.assertThat(File(folderRoot, "src/main/avro/external/tea.avsc").readText()) .containsIgnoringCase("tea") - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/foo.avsc")).doesNotExist() + Assertions.assertThat(File(folderRoot, "src/main/avro/external/foo.avsc")).doesNotExist() } @Test @@ -183,12 +174,13 @@ class DownloadTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") + folderRule.resolve("src/main/avro/external").toFile().mkdir() + val folderRoot = folderRule.toFile() // when val errorCount = DownloadTaskAction( registryClient, - folderRule.root, + folderRoot, arrayListOf(DownloadSubject(subject, outputDir)), ).run() @@ -214,12 +206,13 @@ class DownloadTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") + folderRule.resolve("src/main/avro/external").toFile().mkdir() + val folderRoot = folderRule.toFile() // when val errorCount = DownloadTaskAction( registryClient, - folderRule.root, + folderRoot, arrayListOf( DownloadSubject(invalidSubjectPattern, outputDir, null, true), DownloadSubject("test", outputDir) @@ -228,8 +221,8 @@ class DownloadTaskActionTest { // then Assertions.assertThat(errorCount).isEqualTo(0) - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc")).exists() - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc")).exists() + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc").readText()) .containsIgnoringCase("test") } @@ -267,12 +260,13 @@ class DownloadTaskActionTest { ) Assertions.assertThat(v1Id).isNotEqualTo(v2Id) - folderRule.newFolder("src", "main", "avro", "external") + folderRule.resolve("src/main/avro/external").toFile().mkdir() + val folderRoot = folderRule.toFile() // When val errorCount = DownloadTaskAction( registryClient, - folderRule.root, + folderRoot, arrayListOf( DownloadSubject("test", outputDir, v1Id) ), @@ -280,10 +274,10 @@ class DownloadTaskActionTest { // Then Assertions.assertThat(errorCount).isEqualTo(0) - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc")).isNotNull - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc")).isNotNull + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc").readText()) .containsIgnoringCase("test") - Assertions.assertThat(File(folderRule.root, "src/main/avro/external/test.avsc").readText()) + Assertions.assertThat(File(folderRoot, "src/main/avro/external/test.avsc").readText()) .doesNotContain("desc") } } diff --git a/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt index 0ae6884..36133d5 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt @@ -8,24 +8,21 @@ import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider import org.assertj.core.api.Assertions -import org.gradle.internal.impldep.org.junit.rules.TemporaryFolder -import org.junit.jupiter.api.AfterEach import org.junit.jupiter.api.BeforeEach import org.junit.jupiter.api.Test +import org.junit.jupiter.api.io.TempDir import java.io.File +import java.nio.file.Files +import java.nio.file.Path class RegisterTaskActionTest { - lateinit var folderRule: TemporaryFolder + @TempDir + lateinit var folderRule: Path @BeforeEach - fun setUp() { - folderRule = TemporaryFolder() - folderRule.create() - } - - @AfterEach - fun tearDown() { - folderRule.delete() + fun init() { + Files.createDirectories(folderRule.resolve("src/main/avro/external/")) + Files.createFile(folderRule.resolve("src/main/avro/external/test.avsc")) } @Test @@ -33,8 +30,7 @@ class RegisterTaskActionTest { // given val registryClient = MockSchemaRegistryClient(listOf(AvroSchemaProvider(), JsonSchemaProvider(), ProtobufSchemaProvider())) - folderRule.newFolder("src", "main", "avro", "external") - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -53,7 +49,7 @@ class RegisterTaskActionTest { // when val errorCount = RegisterTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), subjects, null ).run() @@ -77,8 +73,7 @@ class RegisterTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -101,7 +96,7 @@ class RegisterTaskActionTest { // when val errorCount = RegisterTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), subjects, null ).run() @@ -126,8 +121,6 @@ class RegisterTaskActionTest { ).get() ) - folderRule.newFolder("src", "main", "avro", "external") - // Register dependency registryClient.register( "Street", @@ -160,7 +153,7 @@ class RegisterTaskActionTest { ).get() ) - File(folderRule.root, "src/main/avro/external/test.avsc").writeText( + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( """ {"type": "record", "name": "test", @@ -186,7 +179,89 @@ class RegisterTaskActionTest { // when val errorCount = RegisterTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), + subjects, + null + ).run() + + // then + Assertions.assertThat(errorCount).isEqualTo(0) + Assertions.assertThat(registryClient.getLatestSchemaMetadata("test")).isNotNull + } + + @Test + fun `Should register schema with latest remote version of references`() { + // given + val registryClient = + MockSchemaRegistryClient(listOf(AvroSchemaProvider(), JsonSchemaProvider(), ProtobufSchemaProvider())) + registryClient.register( + "test", + registryClient.parseSchema( + AvroSchema.TYPE, + """{"type": "record", "name": "test", "fields": [{ "name": "name", "type": "string" }]}""", + listOf() + ).get() + ) + + // Register dependency + registryClient.register( + "Street", + registryClient.parseSchema( + AvroSchema.TYPE, + """{ + "type": "record", + "name": "Street", + "fields": [ + {"name": "street", "type": "string" } + ] + }""", + listOf() + ).get() + ) + + registryClient.register( + "Address", + registryClient.parseSchema( + AvroSchema.TYPE, + """{ + "type": "record", + "name": "Address", + "fields": [ + {"name": "city", "type": "string" }, + {"name": "street", "type": "Street" } + ] + }""", + listOf(SchemaReference("Street", "Street", 1)) + ).get() + ) + + File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText( + """ + {"type": "record", + "name": "test", + "fields": [ + {"name": "name", "type": "string" }, + {"name": "address", "type": "Address"} + ] + } + """.trimIndent() + ) + + + val subjects = listOf( + Subject( + "test", + "src/main/avro/external/test.avsc", + "AVRO" + ) + .addReference("Address", "Address") + .addReference("Street", "Street") + ) + + // when + val errorCount = RegisterTaskAction( + registryClient, + folderRule.toFile(), subjects, null ).run() @@ -201,9 +276,8 @@ class RegisterTaskActionTest { // given val registryClient = MockSchemaRegistryClient(listOf(AvroSchemaProvider(), JsonSchemaProvider(), ProtobufSchemaProvider())) - folderRule.newFolder("src", "main", "avro", "external") - val resultFolder = folderRule.newFolder("results", "avro") - File(folderRule.root, "src/main/avro/external/test.avsc") + val resultFolder = Files.createDirectories(folderRule.resolve("results/avro")).toFile() + File(folderRule.toFile(), "src/main/avro/external/test.avsc") .writeText( """ {"type": "record", @@ -215,7 +289,7 @@ class RegisterTaskActionTest { } """ ) - File(folderRule.root, "src/main/avro/external/test_2.avsc") + File(folderRule.toFile(), "src/main/avro/external/test_2.avsc") .writeText( """ {"type": "record", @@ -236,7 +310,7 @@ class RegisterTaskActionTest { // when RegisterTaskAction( registryClient, - folderRule.root, + folderRule.toFile(), subjects, resultFolder.path ).run() From 583e2461bfabc57eef43f34a48ca7eec98ed5a6f Mon Sep 17 00:00:00 2001 From: Nick Whelan Date: Tue, 18 Apr 2023 14:17:41 -0500 Subject: [PATCH 2/3] Removing an unnecessary utility class, now that I know the Confluent schema registry accepts `-1` to use the latest version --- README.md | 4 +- .../github/imflog/schema/registry/Subject.kt | 2 +- .../compatibility/CompatibilityTaskAction.kt | 2 - .../tasks/register/RegisterTaskAction.kt | 5 --- .../support/ReferenceCurrentVersionUtil.kt | 38 ------------------- .../CompatibilityTaskActionTest.kt | 2 +- .../tasks/register/RegisterTaskActionTest.kt | 2 +- 7 files changed, 6 insertions(+), 49 deletions(-) delete mode 100644 src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt diff --git a/README.md b/README.md index dfdc567..ac94cfd 100644 --- a/README.md +++ b/README.md @@ -207,13 +207,15 @@ if you need information about the registered id. } } ``` -* For remote references, if no version is specified, the latest version of schema is fetched and used from the schema registry: +* For remote references, if no version is specified, the latest version of schema is fetched and used from the schema registry. + You can also specify `-1` for the same functionality ```groovy schemaRegistry { url = 'http://registry-url:8081' register { subject('avroWithRemoteReferences', '/absolutPath/dependent/path.avsc', "AVRO") .addReference('avroSubject', 'avroSubjectType') + .addReference('avroSubjectTwo', 'avroSubjectTwoType', -1) } } ``` diff --git a/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt b/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt index 6f1d284..04d3c6f 100644 --- a/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt +++ b/src/main/kotlin/com/github/imflog/schema/registry/Subject.kt @@ -16,7 +16,7 @@ data class Subject( } fun addReference(name: String, subject:String): Subject { - references.add(SchemaReference(name, subject, 0)) + references.add(SchemaReference(name, subject, -1)) return this } diff --git a/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt b/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt index 1ce9cd6..ac65915 100644 --- a/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt +++ b/src/main/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskAction.kt @@ -3,7 +3,6 @@ package com.github.imflog.schema.registry.tasks.compatibility import com.github.imflog.schema.registry.LoggingUtils.infoIfNotQuiet import com.github.imflog.schema.registry.Subject import com.github.imflog.schema.registry.parser.SchemaParser -import com.github.imflog.schema.registry.tasks.support.ReferenceCurrentVersionUtil.updateNonPositiveReferencesToCurrentVersion import com.github.imflog.schema.registry.toSchemaType import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException @@ -24,7 +23,6 @@ class CompatibilityTaskAction( var errorCount = 0 for ((subject, path, type, remoteReferences, localReferences) in subjects) { logger.debug("Loading schema for subject($subject) from $path.") - updateNonPositiveReferencesToCurrentVersion(client, remoteReferences) val isCompatible = try { val parsedSchema = SchemaParser .provide(type.toSchemaType(), client, rootDir) diff --git a/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt b/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt index 6f04934..c0583d9 100644 --- a/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt +++ b/src/main/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskAction.kt @@ -5,8 +5,6 @@ import com.github.imflog.schema.registry.LoggingUtils.infoIfNotQuiet import com.github.imflog.schema.registry.SchemaType import com.github.imflog.schema.registry.Subject import com.github.imflog.schema.registry.parser.SchemaParser -import com.github.imflog.schema.registry.tasks.support.ReferenceCurrentVersionUtil -import com.github.imflog.schema.registry.tasks.support.ReferenceCurrentVersionUtil.updateNonPositiveReferencesToCurrentVersion import com.github.imflog.schema.registry.toSchemaType import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference @@ -31,9 +29,6 @@ class RegisterTaskAction( writeOutputFileHeader() subjects.forEach { (subject, path, type, references: List, localReferences) -> try { - updateNonPositiveReferencesToCurrentVersion( - client, references - ) val schemaId = registerSchema(subject, path, type.toSchemaType(), references, localReferences) writeRegisteredSchemaOutput(subject, path, schemaId) } catch (e: Exception) { diff --git a/src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt b/src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt deleted file mode 100644 index 91096c5..0000000 --- a/src/main/kotlin/com/github/imflog/schema/registry/tasks/support/ReferenceCurrentVersionUtil.kt +++ /dev/null @@ -1,38 +0,0 @@ -package com.github.imflog.schema.registry.tasks.support - -import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient -import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference -import org.gradle.api.GradleScriptException -import org.gradle.api.logging.Logging - -object ReferenceCurrentVersionUtil { - private val logger = Logging.getLogger(ReferenceCurrentVersionUtil::class.java) - - fun updateNonPositiveReferencesToCurrentVersion( - client: SchemaRegistryClient, - references: List - ) { - references.forEach { - if (it.version <= 0) { - it.version = getReferenceCurrentVersion(client, it) - } - } - } - - private fun getReferenceCurrentVersion( - client: SchemaRegistryClient, - reference: SchemaReference - ): Int { - logger.debug("Fetching latest remote version for '$reference'") - - val schemas = client.getSchemas(reference.subject, false, true); - if (schemas.size == 0) { - throw GradleScriptException("Did not find any schemas with the subject '${reference.subject}", Throwable()) - } - if (schemas.size > 1) { - throw GradleScriptException("Found more than one schema with the subject prefix '${reference.subject}.", Throwable()) - } - - return client.getVersion(reference.subject, schemas[0]) - } -} \ No newline at end of file diff --git a/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt index a175992..ac3a1e0 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/tasks/compatibility/CompatibilityTaskActionTest.kt @@ -226,7 +226,7 @@ class CompatibilityTaskActionTest { "AVRO" ) .addReference("Address", "Address") - .addReference("Street", "Street") + .addReference("Street", "Street", -1) ) // when diff --git a/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt b/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt index 36133d5..762e631 100644 --- a/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt +++ b/src/test/kotlin/com/github/imflog/schema/registry/tasks/register/RegisterTaskActionTest.kt @@ -255,7 +255,7 @@ class RegisterTaskActionTest { "AVRO" ) .addReference("Address", "Address") - .addReference("Street", "Street") + .addReference("Street", "Street", -1) ) // when From a55df18f5e31f96bec25b45a35d79e57206679ed Mon Sep 17 00:00:00 2001 From: Nick Whelan Date: Wed, 19 Apr 2023 15:16:29 -0500 Subject: [PATCH 3/3] Improving README.md from feedback --- README.md | 28 ++++++++++++++-------------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index ac94cfd..d496d2a 100644 --- a/README.md +++ b/README.md @@ -116,6 +116,8 @@ schemaRegistry { .addLocalReference("localAvroSubject", "/a/local/path.avsc") subject('avroWithRemoteReferences', '/absolutPath/dependent/path.avsc', "AVRO") .addReference('avroSubject', 'avroSubjectType', 1) + .addReference('avroSubjectLatestVersion', 'avroSubjectLatestVersionType') + .addReference('avroSubjectLatestVersionExplicit', 'avroSubjectLatestVersionExplicitType', -1) subject('protoWithReferences', 'dependent/path.proto', "PROTOBUF").addReference('protoSubject', 'protoSubjectType', 1) subject('jsonWithReferences', 'dependent/path.json', "JSON").addReference('jsonSubject', 'jsonSubjectType', 1) } @@ -124,7 +126,11 @@ schemaRegistry { You have to list all the (subject, avsc file path) pairs that you want to test. If you have references with other schemas stored in the registry that are required before the compatibility check, -you can call the `addReference("name", "subject", version)`, this will add a reference to fetch dynamically from the registry. +you can call the `addReference("name", "subject", version)`, +this will add a reference to use from the registry. +A convenience method, `addReference("name", "subject")`, +uses the latest version of the schema in the registry. +You can also specify `-1` explicitly to use the latest version. The addReference calls can be chained. If you have local references to add before calling the compatibility in the registry, @@ -175,13 +181,19 @@ schemaRegistry { .addLocalReference("localAvroSubject", "/a/local/path.avsc") subject('avroWithRemoteReferences', '/absolutPath/dependent/path.avsc', "AVRO") .addReference('avroSubject', 'avroSubjectType', 1) + .addReference('avroSubjectLatestVersion', 'avroSubjectLatestVersionType') + .addReference('avroSubjectLatestVersionExplicit', 'avroSubjectLatestVersionExplicitType', -1) subject('protoWithReferences', 'dependent/path.proto', "PROTOBUF").addReference('protoSubject', 'protoSubjectType', 1) subject('jsonWithReferences', 'dependent/path.json', "JSON").addReference('jsonSubject', 'jsonSubjectType', 1) } } ``` If you have references to other schemas required before the register, -you can call the `addReference("name", "subject", version)`, this will add a reference to use from the registry. +you can call the `addReference("name", "subject", version)`, +this will add a reference to use from the registry. +A convenience method, `addReference("name", "subject")`, +uses the latest version of the schema in the registry. +You can also specify `-1` explicitly to use the latest version. The addReference calls can be chained. If you have local references to add before calling the register, @@ -207,18 +219,6 @@ if you need information about the registered id. } } ``` -* For remote references, if no version is specified, the latest version of schema is fetched and used from the schema registry. - You can also specify `-1` for the same functionality -```groovy -schemaRegistry { - url = 'http://registry-url:8081' - register { - subject('avroWithRemoteReferences', '/absolutPath/dependent/path.avsc', "AVRO") - .addReference('avroSubject', 'avroSubjectType') - .addReference('avroSubjectTwo', 'avroSubjectTwoType', -1) - } -} -``` #### Avro Mixing local and remote references is perfectly fine for Avro without specific configurations.