Skip to content

Commit

Permalink
feat: add fail fast behaviour (#193)
Browse files Browse the repository at this point in the history
  • Loading branch information
ImFlog authored Jan 26, 2025
1 parent d6934d7 commit b5aa723
Show file tree
Hide file tree
Showing 14 changed files with 405 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.github.imflog.schema.registry

import org.gradle.api.file.DirectoryProperty
import org.gradle.api.model.ObjectFactory
import org.gradle.api.provider.Property

Expand All @@ -24,4 +23,8 @@ open class SchemaRegistryExtension(objects: ObjectFactory) {
val pretty: Property<Boolean> = objects.property(Boolean::class.java).apply {
convention(false)
}

val failFast: Property<Boolean> = objects.property(Boolean::class.java).apply {
convention(false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SchemaRegistryPlugin : Plugin<Project> {
)
val downloadExtension = (globalExtension as ExtensionAware).extensions.create(
DownloadSubjectExtension.EXTENSION_NAME,
DownloadSubjectExtension::class.java
DownloadSubjectExtension::class.java,
)
val registerExtension = (globalExtension as ExtensionAware).extensions.create(
RegisterSubjectExtension.EXTENSION_NAME,
Expand All @@ -60,6 +60,7 @@ class SchemaRegistryPlugin : Plugin<Project> {
downloadTask.subjects.set(downloadExtension.subjects)
downloadTask.metadataConfig.set(downloadExtension.metadata)
downloadTask.pretty.set(globalExtension.pretty)
downloadTask.failFast.set(globalExtension.failFast)
}

tasks.register(RegisterSchemasTask.TASK_NAME, RegisterSchemasTask::class.java)
Expand All @@ -69,6 +70,7 @@ class SchemaRegistryPlugin : Plugin<Project> {
registerSchemasTask.ssl.set(sslExtension.configs)
registerSchemasTask.subjects.set(registerExtension.subjects)
registerSchemasTask.outputDirectory.set(globalExtension.outputDirectory)
registerSchemasTask.failFast.set(globalExtension.failFast)
}

tasks.register(CompatibilityTask.TASK_NAME, CompatibilityTask::class.java)
Expand All @@ -77,6 +79,7 @@ class SchemaRegistryPlugin : Plugin<Project> {
compatibilityTask.basicAuth.set(basicAuthExtension.basicAuth)
compatibilityTask.ssl.set(sslExtension.configs)
compatibilityTask.subjects.set(compatibilityExtension.subjects)
compatibilityTask.failFast.set(globalExtension.failFast)
}

tasks.register(ConfigTask.TASK_NAME, ConfigTask::class.java)
Expand All @@ -85,6 +88,7 @@ class SchemaRegistryPlugin : Plugin<Project> {
configTask.basicAuth.set(basicAuthExtension.basicAuth)
configTask.ssl.set(sslExtension.configs)
configTask.subjects.set(configExtension.subjects)
configTask.failFast.set(globalExtension.failFast)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,16 @@ open class CompatibilityTask @Inject constructor(objects: ObjectFactory) : Defau
@Input
val subjects: ListProperty<Subject> = objects.listProperty(Subject::class.java)

@Input
val failFast: Property<Boolean> = objects.property(Boolean::class.java)

@TaskAction
fun testCompatibility() {
val errorCount = CompatibilityTaskAction(
RegistryClientWrapper.client(url.get(), basicAuth.get(), ssl.get()),
project.rootDir,
subjects.get()
subjects.get(),
failFast.getOrElse(false)
).run()
if (errorCount > 0) {
throw GradleScriptException("$errorCount schemas not compatible, see logs for details.", Throwable())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import com.github.imflog.schema.registry.toSchemaType
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors
import org.gradle.api.GradleScriptException
import org.gradle.api.logging.Logging
import java.io.File
import java.io.IOException
Expand All @@ -15,6 +16,7 @@ class CompatibilityTaskAction(
private val client: SchemaRegistryClient,
private val rootDir: File,
private val subjects: List<Subject>,
private val failFast: Boolean = false,
) {

private val logger = Logging.getLogger(CompatibilityTaskAction::class.java)
Expand All @@ -35,18 +37,28 @@ class CompatibilityTaskAction(
}
} catch (_: Exception) {
// If we use a confluent version < 6.1.0 this call may fail as the API response would be a boolean instead of the expected String list.
} finally {
if (failFast) {
throw GradleScriptException("Schema ${subject.file} is not compatible with subject: ${subject.inputSubject}", Throwable())
}
}
}
isCompatible
} catch (ioEx: IOException) {
logger.error("", ioEx)
if (failFast) {
throw ioEx
}
false
} catch (restEx: RestClientException) {
// If the subject does not exist, it is compatible
if (restEx.errorCode == Errors.SUBJECT_NOT_FOUND_ERROR_CODE) {
true
} else {
logger.error("", restEx)
if (failFast) {
throw restEx
}
false
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,15 @@ open class ConfigTask @Inject constructor(objects: ObjectFactory) : DefaultTask(
@Input
val subjects: ListProperty<ConfigSubject> = objects.listProperty(ConfigSubject::class.java)

@Input
val failFast: Property<Boolean> = objects.property(Boolean::class.java)

@TaskAction
fun configureSubjects() {
val errorCount = ConfigTaskAction(
RegistryClientWrapper.client(url.get(), basicAuth.get(), ssl.get()),
subjects.get()
subjects.get(),
failFast.getOrElse(false)
).run()
if (errorCount > 0) {
throw GradleScriptException("$errorCount subject configuration not set, see logs for details", Throwable())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ import org.gradle.api.logging.Logging

class ConfigTaskAction(
private val client: SchemaRegistryClient,
private val subjects: List<ConfigSubject>
private val subjects: List<ConfigSubject>,
private val failFast: Boolean = false,
) {

private val logger = Logging.getLogger(ConfigTaskAction::class.java)
Expand All @@ -18,14 +19,20 @@ class ConfigTaskAction(
logger.debug("$subject: setting config $config")
try {
if (CompatibilityLevel.forName(config) == null) {
logger.error("'$config' is not a valid schema registry compatibility")
logger.error("'$config' is not a valid schema registry compatibility level")
errorCount++
if (failFast) {
throw IllegalArgumentException("'$config' is not a valid schema registry compatibility level")
}
} else {
client.updateCompatibility(subject, config)
}
} catch (ex: RestClientException) {
logger.error("Error during compatibility update for $subject", ex)
errorCount++
if (failFast) {
throw ex
}
}
}
return errorCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,18 @@ open class DownloadTask @Inject constructor(objects: ObjectFactory) : DefaultTas
@Input
val pretty: Property<Boolean> = objects.property(Boolean::class.java)

@Input
val failFast: Property<Boolean> = objects.property(Boolean::class.java)

@TaskAction
fun downloadSchemas() {
val errorCount = DownloadTaskAction(
RegistryClientWrapper.client(url.get(), basicAuth.get(), ssl.get()),
project.rootDir,
subjects.get(),
metadataConfig.get(),
pretty.get()
pretty.get(),
failFast.getOrElse(false),
).run()
if (errorCount > 0) {
throw GradleScriptException("$errorCount schemas not downloaded, see logs for details", Throwable())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ class DownloadTaskAction(
private val rootDir: File,
private val subjects: List<DownloadSubject>,
private val metadataConfiguration: MetadataExtension,
private val pretty: Boolean = false
private val pretty: Boolean = false,
private val failFast: Boolean = false,
) {

private val logger = Logging.getLogger(DownloadTaskAction::class.java)
Expand Down Expand Up @@ -62,6 +63,9 @@ class DownloadTaskAction(
} catch (e: Exception) {
logger.error("Error during schema retrieval for ${downloadSubject.subject}", e)
errorCount++
if (failFast) {
throw e
}
}
}
return errorCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,17 @@ abstract class RegisterSchemasTask @Inject constructor(objects: ObjectFactory) :
@Optional
val outputDirectory: Property<String> = objects.property(String::class.java)

@Input
val failFast: Property<Boolean> = objects.property(Boolean::class.java)

@TaskAction
fun registerSchemas() {
val errorCount = RegisterTaskAction(
RegistryClientWrapper.client(url.get(), basicAuth.get(), ssl.get()),
project.rootDir,
subjects.get(),
outputDirectory.orNull
outputDirectory.orNull,
failFast.getOrElse(false)
).run()
if (errorCount > 0) {
throw GradleScriptException("$errorCount schemas not registered, see logs for details", Throwable())
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
package com.github.imflog.schema.registry.tasks.register

import com.github.imflog.schema.registry.LocalReference
import com.github.imflog.schema.registry.LoggingUtils.infoIfNotQuiet
import com.github.imflog.schema.registry.SchemaType
import com.github.imflog.schema.registry.Subject
import com.github.imflog.schema.registry.parser.SchemaParser
import com.github.imflog.schema.registry.toSchemaType
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient
import io.confluent.kafka.schemaregistry.client.rest.entities.Metadata
import io.confluent.kafka.schemaregistry.client.rest.entities.RuleSet
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference
import org.gradle.api.logging.Logging
import java.io.File

Expand All @@ -18,7 +13,8 @@ class RegisterTaskAction(
private val client: SchemaRegistryClient,
private val rootDir: File,
private val subjects: List<Subject>,
outputDir: String?
outputDir: String?,
private val failFast: Boolean = false,
) {

private val logger = Logging.getLogger(RegisterTaskAction::class.java)
Expand All @@ -32,9 +28,12 @@ class RegisterTaskAction(
subjects.forEach { subject ->
try {
val schemaId = registerSchema(subject)
writeRegisteredSchemaOutput(subject.inputSubject,subject.file, schemaId)
writeRegisteredSchemaOutput(subject.inputSubject, subject.file, schemaId)
} catch (e: Exception) {
logger.error("Could not register schema for '$subject'", e)
if (failFast) {
throw e
}
errorCount++
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -423,4 +423,127 @@ class CompatibilityTaskActionTest {
// then
Assertions.assertThat(errorCount).isEqualTo(0)
}
}

@Test
fun `Should fail fast`() {
// given
val registryClient = MockSchemaRegistryClient()
registryClient.register(
"test",
registryClient.parseSchema(
AvroSchema.TYPE,
"""{
"type": "record",
"name": "test",
"fields": [
{ "name": "name", "type": "string" }
]
}""",
listOf()
).get()
)

// Compatible
File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText(
"""
{"type": "record",
"name": "test",
"fields": [
{"name": "name", "type": "string" },
{"name": "newField", "type": "string", "default": ""}
]
}
"""
)
// Incompatible
File(folderRule.toFile(), "src/main/avro/external/test_incompat.avsc").writeText(
"""
{"type": "record",
"name": "test",
"fields": [
{"name": "name", "type": "boolean" }
]
}
"""
)
val subjects = arrayListOf(
Subject("test", "src/main/avro/external/test.avsc", "AVRO"),
Subject("test", "src/main/avro/external/test_incompat.avsc", "AVRO"),
Subject("test", "src/main/avro/external/test.avsc", "AVRO"),
)

// when
try {
CompatibilityTaskAction(
registryClient,
folderRule.toFile(),
subjects,
failFast = true,
).run()
Assertions.fail("Should have thrown an exception")
} catch (e: Exception) {
// then
// Nothing to do
}
}

@Test
fun `Should fail silently on error`() {
// given
val registryClient = MockSchemaRegistryClient()
registryClient.register(
"test",
registryClient.parseSchema(
AvroSchema.TYPE,
"""{
"type": "record",
"name": "test",
"fields": [
{ "name": "name", "type": "string" }
]
}""",
listOf()
).get()
)

// Compatible
File(folderRule.toFile(), "src/main/avro/external/test.avsc").writeText(
"""
{"type": "record",
"name": "test",
"fields": [
{"name": "name", "type": "string" },
{"name": "newField", "type": "string", "default": ""}
]
}
"""
)
// Incompatible
File(folderRule.toFile(), "src/main/avro/external/test_incompat.avsc").writeText(
"""
{"type": "record",
"name": "test",
"fields": [
{"name": "name", "type": "boolean" }
]
}
"""
)
val subjects = arrayListOf(
Subject("test", "src/main/avro/external/test.avsc", "AVRO"),
Subject("test", "src/main/avro/external/test_incompat.avsc", "AVRO"),
Subject("test", "src/main/avro/external/test.avsc", "AVRO"),
)

// when
val errorCount = CompatibilityTaskAction(
registryClient,
folderRule.toFile(),
subjects,
// failFast = false,
).run()

// then
Assertions.assertThat(errorCount).isEqualTo(1)
}
}
Loading

0 comments on commit b5aa723

Please sign in to comment.