diff --git a/LICENSE-binary b/LICENSE-binary index 3e2ba5a7f1574..08092f6def97c 100644 --- a/LICENSE-binary +++ b/LICENSE-binary @@ -302,7 +302,6 @@ see: licenses/CDDL+GPL-1.1 javax.activation-api-1.2.0 javax.annotation-api-1.3.2 javax.servlet-api-3.1.0 -javax.ws.rs-api-2.1.1 jaxb-api-2.3.1 activation-1.1.1 diff --git a/build.gradle b/build.gradle index cfe1dd57da271..474f5b5f73b80 100644 --- a/build.gradle +++ b/build.gradle @@ -1835,7 +1835,7 @@ project(':generator') { implementation libs.argparse4j implementation libs.jacksonDatabind implementation libs.jacksonJDK8Datatypes - implementation libs.jacksonJaxrsJsonProvider + implementation libs.jacksonJakartarsJsonProvider implementation 'org.eclipse.jgit:org.eclipse.jgit:6.4.0.202211300538-r' // SSH support for JGit based on Apache MINA sshd @@ -1882,7 +1882,7 @@ project(':clients') { compileOnly libs.jose4j // for SASL/OAUTHBEARER JWT validation; only used by broker testImplementation libs.bcpkix - testImplementation libs.jacksonJaxrsJsonProvider + testImplementation libs.jacksonJakartarsJsonProvider testImplementation libs.jose4j testImplementation libs.junitJupiter testImplementation libs.reload4j @@ -2446,7 +2446,7 @@ project(':tools') { implementation libs.re2j implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation - implementation libs.jacksonJaxrsJsonProvider + implementation libs.jacksonJakartarsJsonProvider testImplementation project(':clients') testImplementation project(':clients').sourceSets.test.output @@ -2515,14 +2515,20 @@ project(':trogdor') { implementation libs.slf4jApi runtimeOnly libs.reload4j - implementation libs.jacksonJaxrsJsonProvider + implementation libs.jacksonJakartarsJsonProvider implementation libs.jerseyContainerServlet implementation libs.jerseyHk2 implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 implementation libs.activation // Jersey dependency that was available in the JDK before Java 9 - implementation libs.jettyServer - implementation libs.jettyServlet - implementation libs.jettyServlets + implementation (libs.jettyServer) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlet) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlets) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } implementation project(':group-coordinator') implementation project(':group-coordinator:group-coordinator-api') @@ -2572,7 +2578,7 @@ project(':shell') { implementation project(':raft') implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation - implementation libs.jacksonJaxrsJsonProvider + implementation libs.jacksonJakartarsJsonProvider testImplementation project(':clients') testImplementation project(':clients').sourceSets.test.output @@ -2895,11 +2901,7 @@ project(':streams:examples') { dependencies { // this dependency should be removed after we unify data API - implementation(project(':connect:json')) { - // this transitive dependency is not used in Streams, and it breaks SBT builds - exclude module: 'javax.ws.rs-api' - } - + implementation(project(':connect:json')) implementation project(':streams') implementation libs.slf4jReload4j @@ -3342,7 +3344,7 @@ project(':connect:api') { api project(':clients') implementation libs.slf4jApi runtimeOnly libs.reload4j - implementation libs.jaxrsApi + implementation libs.jakartaRsApi testImplementation libs.junitJupiter testImplementation project(':clients').sourceSets.test.output @@ -3466,15 +3468,23 @@ project(':connect:runtime') { implementation libs.slf4jReload4j implementation libs.jose4j // for SASL/OAUTHBEARER JWT validation implementation libs.jacksonAnnotations - implementation libs.jacksonJaxrsJsonProvider + implementation libs.jacksonJakartarsJsonProvider implementation libs.jerseyContainerServlet implementation libs.jerseyHk2 implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 implementation libs.activation // Jersey dependency that was available in the JDK before Java 9 - implementation libs.jettyServer - implementation libs.jettyServlet - implementation libs.jettyServlets - implementation libs.jettyClient + implementation (libs.jettyServer) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlet) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlets) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyClient) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } implementation libs.classgraph implementation libs.mavenArtifact implementation libs.swaggerAnnotations @@ -3637,7 +3647,7 @@ project(':connect:basic-auth-extension') { implementation project(':connect:api') implementation libs.slf4jApi runtimeOnly libs.reload4j - implementation libs.jaxrsApi + implementation libs.jakartaRsApi implementation libs.jaxAnnotationApi testImplementation libs.bcpkix @@ -3683,15 +3693,23 @@ project(':connect:mirror') { implementation libs.slf4jApi runtimeOnly libs.reload4j implementation libs.jacksonAnnotations - implementation libs.jacksonJaxrsJsonProvider + implementation libs.jacksonJakartarsJsonProvider implementation libs.jerseyContainerServlet implementation libs.jerseyHk2 implementation libs.jaxbApi // Jersey dependency that was available in the JDK before Java 9 implementation libs.activation // Jersey dependency that was available in the JDK before Java 9 - implementation libs.jettyServer - implementation libs.jettyServlet - implementation libs.jettyServlets - implementation libs.jettyClient + implementation (libs.jettyServer) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlet) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyServlets) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } + implementation (libs.jettyClient) { + exclude group: 'org.slf4j', module: 'slf4j-api' + } implementation libs.swaggerAnnotations testImplementation libs.junitJupiter diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index c6ec12994fdbd..43811be2f40db 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -360,8 +360,8 @@ - - + + @@ -529,7 +529,7 @@ - + @@ -555,8 +555,8 @@ - - + + @@ -571,10 +571,10 @@ - - + + - + @@ -588,7 +588,7 @@ - + @@ -620,8 +620,8 @@ - - + + @@ -633,7 +633,7 @@ - + diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java index 73f87dd04ee05..ca960414dd5b8 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtension.java @@ -51,7 +51,7 @@ public interface ConnectRestExtension extends Configurable, Versioned, Closeable * will invoke this method after registering the default Connect resources. If the implementations attempt * to re-register any of the Connect resources, it will be ignored and will be logged. * - * @param restPluginContext The context provides access to JAX-RS {@link javax.ws.rs.core.Configurable} and {@link + * @param restPluginContext The context provides access to JAX-RS {@link jakarta.ws.rs.core.Configurable} and {@link * ConnectClusterState}.The custom JAX-RS resources can be registered via the {@link * ConnectRestExtensionContext#configurable()} */ diff --git a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java index 5e357be8c9142..0bfcee678b1aa 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/rest/ConnectRestExtensionContext.java @@ -19,19 +19,20 @@ import org.apache.kafka.connect.health.ConnectClusterState; -import javax.ws.rs.core.Configurable; +import jakarta.ws.rs.core.Configurable; + /** * The interface provides the ability for {@link ConnectRestExtension} implementations to access the JAX-RS - * {@link javax.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided + * {@link jakarta.ws.rs.core.Configurable} and cluster state {@link ConnectClusterState}. The implementation for the interface is provided * by the Connect framework. */ public interface ConnectRestExtensionContext { /** - * Provides an implementation of {@link javax.ws.rs.core.Configurable} that can be used to register JAX-RS resources. + * Provides an implementation of {@link jakarta.ws.rs.core.Configurable} that can be used to register JAX-RS resources. * - * @return the JAX-RS {@link javax.ws.rs.core.Configurable}; never {@code null} + * @return the JAX-RS {@link jakarta.ws.rs.core.Configurable}; never {@code null} */ Configurable> configurable(); diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java index 58aac7994aefc..a72f85d068888 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtension.java @@ -51,7 +51,7 @@ * * *

This is a reference implementation of the {@link ConnectRestExtension} interface. It registers an implementation of {@link - * javax.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link + * jakarta.ws.rs.container.ContainerRequestFilter} that does JAAS based authentication of incoming Basic Auth credentials. {@link * ConnectRestExtension} implementations are loaded via the plugin class loader using {@link java.util.ServiceLoader} mechanism and hence * the packaged jar includes {@code META-INF/services/org.apache.kafka.connect.rest.extension.ConnectRestExtension} with the entry * {@code org.apache.kafka.connect.extension.auth.jaas.BasicAuthSecurityRestExtension} diff --git a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java index b090ee21d449d..d404bdc7dc19a 100644 --- a/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java +++ b/connect/basic-auth-extension/src/main/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilter.java @@ -42,12 +42,13 @@ import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.Priorities; -import javax.ws.rs.container.ContainerRequestContext; -import javax.ws.rs.container.ContainerRequestFilter; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.SecurityContext; + +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.Priorities; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; @Priority(Priorities.AUTHENTICATION) public class JaasBasicAuthFilter implements ContainerRequestFilter { diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java index 146bd6a2adf63..81f3a7327d576 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/BasicAuthSecurityRestExtensionTest.java @@ -31,7 +31,8 @@ import java.util.function.Supplier; import javax.security.auth.login.Configuration; -import javax.ws.rs.core.Configurable; + +import jakarta.ws.rs.core.Configurable; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotEquals; diff --git a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java index 24ecadcc0a09b..bcd6e0ab31995 100644 --- a/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java +++ b/connect/basic-auth-extension/src/test/java/org/apache/kafka/connect/rest/basic/auth/extension/JaasBasicAuthFilterTest.java @@ -39,11 +39,12 @@ import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.ChoiceCallback; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.container.ContainerRequestContext; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.SecurityContext; -import javax.ws.rs.core.UriInfo; + +import jakarta.ws.rs.HttpMethod; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.SecurityContext; +import jakarta.ws.rs.core.UriInfo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 06480bcf4a5a4..4c02160a194f5 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -24,11 +24,11 @@ import java.util.Map; -import javax.inject.Inject; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.Path; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.UriInfo; +import jakarta.inject.Inject; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.UriInfo; @Path("/{source}/{target}/connectors") public class InternalMirrorResource extends InternalClusterResource { diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java index d20484e788525..2ba4438bdba9b 100644 --- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java +++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/DedicatedMirrorIntegrationTest.java @@ -280,7 +280,13 @@ public void testMultiNodeCluster() throws Exception { // Cluster aliases final String a = "A"; // Use a convoluted cluster name to ensure URL encoding/decoding works - final String b = "B- ._~:/?#[]@!$&'()*+;=\"<>%{}|\\^`618"; + // The servlet 6.0 spec no longer allows some characters such as forward slashes, control characters, + // etc. even if they are encoded. Jetty 12 will enforce this and throw a 400 ambiguous error + // so the string of characters for the variable "b" has been updated to only include characters + // that are valid with the new spec. + // See https://jakarta.ee/specifications/servlet/6.0/jakarta-servlet-spec-6.0#uri-path-canonicalization + // and specifically the section: "10. Rejecting Suspicious Sequences." for details. + final String b = "B-_~:?#[]@!$&'()*+=\"<>{}|^`618"; final String ab = a + "->" + b; final String ba = b + "->" + a; final String testTopicPrefix = "test-topic-"; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java index ff7a9d3149d58..0d2a664cbc251 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java @@ -112,8 +112,9 @@ import javax.crypto.KeyGenerator; import javax.crypto.SecretKey; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; + +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG; import static org.apache.kafka.common.utils.Utils.UncheckedCloseable; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java index bcd4fa18fc29c..ca2ab18d43b43 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestConfigurable.java @@ -24,8 +24,8 @@ import java.util.Map; import java.util.Objects; -import javax.ws.rs.core.Configurable; -import javax.ws.rs.core.Configuration; +import jakarta.ws.rs.core.Configurable; +import jakarta.ws.rs.core.Configuration; /** * The implementation delegates to {@link ResourceConfig} so that we can handle duplicate diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java index 8098f8c97cc53..1990ebdf36926 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestExtensionContextImpl.java @@ -20,7 +20,7 @@ import org.apache.kafka.connect.health.ConnectClusterState; import org.apache.kafka.connect.rest.ConnectRestExtensionContext; -import javax.ws.rs.core.Configurable; +import jakarta.ws.rs.core.Configurable; public class ConnectRestExtensionContextImpl implements ConnectRestExtensionContext { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java index dd38f769fe8ff..4dedc7289b8f4 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java @@ -33,9 +33,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; public class HerderRequestHandler { @@ -113,6 +113,7 @@ public T completeOrForwardRequest(FutureCallback cb, } String forwardUrl = uriBuilder.build().toString(); log.debug("Forwarding request {} {} {}", forwardUrl, method, body); + // TODO, we may need to set the request timeout as Idle timeout on the HttpClient return translator.translate(restClient.httpRequest(forwardUrl, method, headers, body, resultType)); } else { log.error("Request '{} {}' failed because it couldn't find the target Connect worker within two hops (between workers).", diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java index 6fe4134d1c52a..902187a83fc8a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignature.java @@ -20,7 +20,7 @@ import org.apache.kafka.connect.runtime.distributed.Crypto; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import java.security.InvalidKeyException; import java.security.MessageDigest; @@ -31,7 +31,8 @@ import javax.crypto.Mac; import javax.crypto.SecretKey; -import javax.ws.rs.core.HttpHeaders; + +import jakarta.ws.rs.core.HttpHeaders; public class InternalRequestSignature { @@ -59,8 +60,10 @@ public static void addToRequest(Crypto crypto, SecretKey key, byte[] requestBody throw new ConnectException(e); } byte[] requestSignature = sign(mac, key, requestBody); - request.header(InternalRequestSignature.SIGNATURE_HEADER, Base64.getEncoder().encodeToString(requestSignature)) - .header(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER, signatureAlgorithm); + request.headers(field -> { + field.add(InternalRequestSignature.SIGNATURE_HEADER, Base64.getEncoder().encodeToString(requestSignature)); + field.add(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER, signatureAlgorithm); + }); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java index a6db20ce64e54..511f7f9f2c7a2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestClient.java @@ -26,13 +26,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.StringRequestContent; +import org.eclipse.jetty.client.transport.HttpClientTransportDynamic; import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.io.ClientConnector; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +48,9 @@ import java.util.concurrent.TimeoutException; import javax.crypto.SecretKey; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.Response; + +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.Response; /** * Client for outbound REST requests to other members of a Connect cluster @@ -65,7 +68,15 @@ public RestClient(AbstractConfig config) { // VisibleForTesting HttpClient httpClient(SslContextFactory.Client sslContextFactory) { - return sslContextFactory != null ? new HttpClient(sslContextFactory) : new HttpClient(); + final HttpClient client; + if (sslContextFactory != null) { + ClientConnector clientConnector = new ClientConnector(); + clientConnector.setSslContextFactory(sslContextFactory); + client = new HttpClient(new HttpClientTransportDynamic(clientConnector)); + } else { + client = new HttpClient(); + } + return client; } /** @@ -162,7 +173,7 @@ private HttpResponse httpRequest(HttpClient client, String url, String me addHeadersToRequest(headers, req); if (serializedBody != null) { - req.content(new StringContentProvider(serializedBody, StandardCharsets.UTF_8), "application/json"); + req.body(new StringRequestContent("application/json", serializedBody, StandardCharsets.UTF_8)); } if (sessionKey != null && requestSignatureAlgorithm != null) { @@ -220,7 +231,7 @@ private static void addHeadersToRequest(HttpHeaders headers, Request req) { if (headers != null) { String credentialAuthorization = headers.getHeaderString(HttpHeaders.AUTHORIZATION); if (credentialAuthorization != null) { - req.header(HttpHeaders.AUTHORIZATION, credentialAuthorization); + req.headers(field -> field.add(HttpHeaders.AUTHORIZATION, credentialAuthorization)); } } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 9468166763cea..b6c7690a51d79 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -28,8 +28,12 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; import org.apache.kafka.connect.runtime.rest.util.SSLUtils; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; +import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider; +import org.eclipse.jetty.ee10.servlet.FilterHolder; +import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.ee10.servlet.ServletHolder; +import org.eclipse.jetty.ee10.servlets.HeaderFilter; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.CustomRequestLog; import org.eclipse.jetty.server.Handler; @@ -37,12 +41,8 @@ import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.Slf4jRequestLogWriter; import org.eclipse.jetty.server.handler.ContextHandlerCollection; +import org.eclipse.jetty.server.handler.CrossOriginHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.FilterHolder; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; -import org.eclipse.jetty.servlets.CrossOriginFilter; -import org.eclipse.jetty.servlets.HeaderFilter; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.glassfish.hk2.utilities.Binder; import org.glassfish.hk2.utilities.binding.AbstractBinder; @@ -60,12 +60,13 @@ import java.util.EnumSet; import java.util.List; import java.util.Locale; +import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; -import javax.servlet.DispatcherType; -import javax.ws.rs.core.UriBuilder; +import jakarta.servlet.DispatcherType; +import jakarta.ws.rs.core.UriBuilder; /** * Embedded server for the REST API that provides the control plane for Kafka Connect workers. @@ -189,6 +190,9 @@ public final Connector createConnector(String listener, boolean isAdmin) { connector.setPort(port); + // TODO: do we need this? + connector.setIdleTimeout(requestTimeout.timeoutMs()); + return connector; } @@ -263,20 +267,21 @@ protected final void initializeResources() { ServletHolder adminServletHolder = new ServletHolder(new ServletContainer(adminResourceConfig)); adminContext.setContextPath("/"); adminContext.addServlet(adminServletHolder, "/*"); - adminContext.setVirtualHosts(new String[]{"@" + ADMIN_SERVER_CONNECTOR_NAME}); + adminContext.setVirtualHosts(List.of("@" + ADMIN_SERVER_CONNECTOR_NAME)); contextHandlers.add(adminContext); } String allowedOrigins = config.allowedOrigins(); if (!Utils.isBlank(allowedOrigins)) { - FilterHolder filterHolder = new FilterHolder(new CrossOriginFilter()); - filterHolder.setName("cross-origin"); - filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_ORIGINS_PARAM, allowedOrigins); + CrossOriginHandler crossOriginHandler = new CrossOriginHandler(); + crossOriginHandler.setAllowedOriginPatterns(Set.of(allowedOrigins.split(","))); String allowedMethods = config.allowedMethods(); if (!Utils.isBlank(allowedMethods)) { - filterHolder.setInitParameter(CrossOriginFilter.ALLOWED_METHODS_PARAM, allowedMethods); + crossOriginHandler.setAllowedMethods(Set.of(allowedMethods.split(","))); } - context.addFilter(filterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); + // Setting to true matches the previously used CrossOriginFilter + crossOriginHandler.setDeliverPreflightRequests(true); + context.insertHandler(crossOriginHandler); } String headerConfig = config.responseHeaders(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java index 33bbb04b3f75c..1e33732dc58db 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/BadRequestException.java @@ -16,7 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.errors; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; public class BadRequestException extends ConnectRestException { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java index 9ce3e9e74d115..91c337c234b99 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java @@ -23,14 +23,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; -import javax.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriInfo; +import jakarta.ws.rs.ext.ExceptionMapper; /** - * Maps uncaught exceptions thrown while handling REST requests to appropriate {@link javax.ws.rs.core.Response}s + * Maps uncaught exceptions thrown while handling REST requests to appropriate {@link jakarta.ws.rs.core.Response}s */ public class ConnectExceptionMapper implements ExceptionMapper { private static final Logger log = LoggerFactory.getLogger(ConnectExceptionMapper.class); @@ -49,7 +49,7 @@ public Response toResponse(Exception exception) { .build(); } - if (exception instanceof NotFoundException || exception instanceof javax.ws.rs.NotFoundException) { + if (exception instanceof NotFoundException || exception instanceof jakarta.ws.rs.NotFoundException) { return Response.status(Response.Status.NOT_FOUND) .entity(new ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage())) .build(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java index f45f72ddd8bd3..0d45ea578be86 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java @@ -18,7 +18,7 @@ import org.apache.kafka.connect.errors.ConnectException; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; public class ConnectRestException extends ConnectException { private final int statusCode; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 6de327bf5578b..800a8b2c1a3d2 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -41,21 +41,20 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; -import javax.inject.Inject; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; +import jakarta.inject.Inject; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; @Path("/connector-plugins") @Produces(MediaType.APPLICATION_JSON) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index dec053b0a4105..efbf39d790bef 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -46,29 +46,28 @@ import java.util.List; import java.util.Map; -import javax.inject.Inject; -import javax.servlet.ServletContext; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.PATCH; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; -import javax.ws.rs.core.UriInfo; - import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; +import jakarta.inject.Inject; +import jakarta.servlet.ServletContext; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PATCH; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; +import jakarta.ws.rs.core.UriInfo; import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.IdentityTranslator; import static org.apache.kafka.connect.runtime.rest.HerderRequestHandler.Translator; @@ -81,7 +80,7 @@ public class ConnectorsResource { private final Herder herder; private final HerderRequestHandler requestHandler; - @javax.ws.rs.core.Context + @jakarta.ws.rs.core.Context private ServletContext context; private final boolean isTopicTrackingDisabled; private final boolean isTopicTrackingResetDisabled; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java index b9756c381d99a..8ffec431f36de 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java @@ -30,18 +30,17 @@ import java.util.List; import java.util.Map; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.UriInfo; - import io.swagger.v3.oas.annotations.Operation; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.UriInfo; /** * Contains endpoints necessary for intra-cluster communication--that is, requests that @@ -66,7 +65,7 @@ protected InternalClusterResource(RestClient restClient, RestRequestTimeout requ /** * @return a {@link Herder} instance that can be used to satisfy the current request; may not be null - * @throws javax.ws.rs.NotFoundException if no such herder can be provided + * @throws jakarta.ws.rs.NotFoundException if no such herder can be provided */ protected abstract Herder herderForRequest(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java index 228c7cd67baf6..760d36a8fc3c3 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java @@ -20,8 +20,8 @@ import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; -import javax.inject.Inject; -import javax.ws.rs.Path; +import jakarta.inject.Inject; +import jakarta.ws.rs.Path; @Path("/connectors") public class InternalConnectResource extends InternalClusterResource { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java index 44aa617bd4bae..85be83061eb55 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java @@ -29,20 +29,19 @@ import java.util.Map; import java.util.Objects; -import javax.inject.Inject; -import javax.ws.rs.Consumes; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - import io.swagger.v3.oas.annotations.Operation; import io.swagger.v3.oas.annotations.Parameter; +import jakarta.inject.Inject; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; /** * A set of endpoints to adjust the log levels of runtime loggers. diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index 8cdad7bc800f0..0af2983395ee0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -28,14 +28,13 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import javax.inject.Inject; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; - import io.swagger.v3.oas.annotations.Operation; +import jakarta.inject.Inject; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; @Path("/") @Produces(MediaType.APPLICATION_JSON) diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 948dcfaf1592c..54aa1bb19084e 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -63,7 +63,7 @@ import java.util.stream.IntStream; import java.util.stream.Stream; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 195905f3b76d3..079887c361d24 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -73,9 +73,9 @@ import java.util.stream.Collectors; import java.util.stream.IntStream; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; -import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; +import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; import static org.apache.kafka.clients.CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG; import static org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java index a95352dbdbf0c..9b76bf2ce64cb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorRestartApiIntegrationTest.java @@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java index a625dc983e8a8..d85ac9a440cb4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/InternalTopicsIntegrationTest.java @@ -36,7 +36,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java index 78c9a61406559..e0f395f442508 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/OffsetsApiIntegrationTest.java @@ -55,9 +55,9 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; -import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; +import static jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR; import static org.apache.kafka.connect.integration.MonitorableSourceConnector.TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java index 8ccc31baa86c9..1af52dba59f89 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java @@ -36,11 +36,11 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import javax.ws.rs.GET; -import javax.ws.rs.Path; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.Response; -import static javax.ws.rs.core.Response.Status.BAD_REQUEST; +import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java index 7969471918e1d..86473ffe613b4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java @@ -31,8 +31,8 @@ import java.util.Map; import java.util.concurrent.TimeUnit; -import static javax.ws.rs.core.Response.Status.BAD_REQUEST; -import static javax.ws.rs.core.Response.Status.FORBIDDEN; +import static jakarta.ws.rs.core.Response.Status.BAD_REQUEST; +import static jakarta.ws.rs.core.Response.Status.FORBIDDEN; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java index 69a65ba7bfbde..f13781c8ceabb 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StandaloneWorkerIntegrationTest.java @@ -40,7 +40,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.apache.kafka.connect.integration.BlockingConnectorTest.Block.BLOCK_CONFIG; import static org.apache.kafka.connect.integration.BlockingConnectorTest.CONNECTOR_START; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java index 6b4d066ca1016..2632360b7f6a4 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java @@ -114,9 +114,9 @@ import javax.crypto.SecretKey; +import static jakarta.ws.rs.core.Response.Status.FORBIDDEN; +import static jakarta.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static java.util.Collections.singletonList; -import static javax.ws.rs.core.Response.Status.FORBIDDEN; -import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE; import static org.apache.kafka.common.utils.Utils.UncheckedCloseable; import static org.apache.kafka.connect.runtime.AbstractStatus.State.FAILED; import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java index 3c99091740088..d0559123b7251 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java @@ -463,9 +463,7 @@ private static ThreadFactory threadFactoryWithNamedThreads(String threadPrefix) return r -> { // This is essentially Executors.defaultThreadFactory except with // custom thread names so in order to filter by thread names when debugging - SecurityManager s = System.getSecurityManager(); - Thread t = new Thread((s != null) ? s.getThreadGroup() : - Thread.currentThread().getThreadGroup(), r, + Thread t = new Thread(Thread.currentThread().getThreadGroup(), r, threadPrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java index 9a686aa81fdff..58ca25ce0f023 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java @@ -58,7 +58,7 @@ import java.util.HashMap; import java.util.Map; -import javax.ws.rs.core.MediaType; +import jakarta.ws.rs.core.MediaType; import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java index 4d37b7e67b76d..394031e0df105 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/InternalRequestSignatureTest.java @@ -21,21 +21,23 @@ import org.apache.kafka.connect.runtime.distributed.Crypto; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.Request; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.ArgumentCaptor; import org.mockito.junit.jupiter.MockitoExtension; import org.mockito.junit.jupiter.MockitoSettings; import org.mockito.quality.Strictness; +import java.net.URI; import java.security.NoSuchAlgorithmException; import java.util.Base64; import javax.crypto.Mac; import javax.crypto.SecretKey; import javax.crypto.spec.SecretKeySpec; -import javax.ws.rs.core.HttpHeaders; + +import jakarta.ws.rs.core.HttpHeaders; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -113,25 +115,16 @@ public void addToRequestShouldThrowExceptionOnInvalidSignatureAlgorithm() throws @Test public void addToRequestShouldAddHeadersOnValidSignatureAlgorithm() { - Request request = mock(Request.class); - ArgumentCaptor signatureCapture = ArgumentCaptor.forClass(String.class); - ArgumentCaptor signatureAlgorithmCapture = ArgumentCaptor.forClass(String.class); - when(request.header( - eq(InternalRequestSignature.SIGNATURE_HEADER), - signatureCapture.capture() - )).thenReturn(request); - when(request.header( - eq(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER), - signatureAlgorithmCapture.capture() - )).thenReturn(request); + HttpClient httpClient = new HttpClient(); + Request request = httpClient.newRequest(URI.create("http://localhost")); InternalRequestSignature.addToRequest(crypto, KEY, REQUEST_BODY, SIGNATURE_ALGORITHM, request); assertEquals(ENCODED_SIGNATURE, - signatureCapture.getValue(), + request.getHeaders().get(InternalRequestSignature.SIGNATURE_HEADER), "Request should have valid base 64-encoded signature added as header"); assertEquals(SIGNATURE_ALGORITHM, - signatureAlgorithmCapture.getValue(), + request.getHeaders().get(InternalRequestSignature.SIGNATURE_ALGORITHM_HEADER), "Request should have provided signature algorithm added as header"); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java index 75e224cfa32bb..b5449daa81202 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestClientTest.java @@ -25,9 +25,9 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.Request; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; @@ -45,7 +45,8 @@ import java.util.stream.Stream; import javax.crypto.SecretKey; -import javax.ws.rs.core.Response; + +import jakarta.ws.rs.core.Response; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -118,7 +119,7 @@ private static Stream requestExceptions() { private static Request buildThrowingMockRequest(Throwable t) throws ExecutionException, InterruptedException, TimeoutException { Request req = mock(Request.class); - when(req.header(anyString(), anyString())).thenReturn(req); + when(req.headers(any())).thenReturn(req); when(req.send()).thenThrow(t); return req; } @@ -310,7 +311,7 @@ public void testUseSslConfigsOnlyWhenNecessary() throws Exception { public void testHttpRequestInterrupted() throws ExecutionException, InterruptedException, TimeoutException { Request req = mock(Request.class); doThrow(new InterruptedException()).when(req).send(); - doReturn(req).when(req).header(anyString(), anyString()); + doReturn(req).when(req).headers(any()); doReturn(req).when(httpClient).newRequest(anyString()); ConnectRestException e = assertThrows(ConnectRestException.class, () -> httpRequest( httpClient, MOCK_URL, TEST_METHOD, TEST_TYPE, TEST_SIGNATURE_ALGORITHM @@ -323,7 +324,7 @@ public void testHttpRequestInterrupted() throws ExecutionException, InterruptedE private void setupHttpClient(int responseCode, Request req, ContentResponse resp) throws Exception { when(resp.getStatus()).thenReturn(responseCode); when(req.send()).thenReturn(resp); - when(req.header(anyString(), anyString())).thenReturn(req); + when(req.headers(any())).thenReturn(req); when(httpClient.newRequest(anyString())).thenReturn(req); } @@ -356,4 +357,4 @@ public int hashCode() { return Objects.hash(content); } } -} \ No newline at end of file +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java index eb34d619d5cc9..bec99d2d55c16 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java @@ -84,7 +84,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import javax.ws.rs.BadRequestException; +import jakarta.ws.rs.BadRequestException; import static java.util.Arrays.asList; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index 700284a9c66ee..9dfead77220f6 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -64,12 +64,12 @@ import java.util.Map; import java.util.Set; -import javax.ws.rs.BadRequestException; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.MultivaluedHashMap; -import javax.ws.rs.core.MultivaluedMap; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriInfo; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.MultivaluedHashMap; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriInfo; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java index 0e24f86695169..aee85a86c2ab2 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java @@ -44,8 +44,9 @@ import java.util.Map; import javax.crypto.Mac; -import javax.ws.rs.core.HttpHeaders; -import javax.ws.rs.core.UriInfo; + +import jakarta.ws.rs.core.HttpHeaders; +import jakarta.ws.rs.core.UriInfo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java index 916de425bd984..c73bba8c84368 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResourceTest.java @@ -35,7 +35,7 @@ import java.util.Collections; import java.util.List; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; @@ -167,4 +167,4 @@ public void testSetLevelClusterScope() { verify(herder).setClusterLoggerLevel(logger, level); } -} \ No newline at end of file +} diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java index dfdf081227cf3..459bc58201392 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java @@ -37,7 +37,7 @@ import java.util.concurrent.TimeoutException; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_HEALTH_CHECK_TIMEOUT_MS; import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java index a3dc0efef99d4..408f4cb886b29 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/util/SSLUtilsTest.java @@ -17,9 +17,12 @@ package org.apache.kafka.connect.runtime.rest.util; import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.config.types.Password; +import org.apache.kafka.common.network.CertStores; import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import java.util.Arrays; @@ -33,6 +36,22 @@ public class SSLUtilsTest { + private Map sslConfig; + private String keystorePath; + private String truststorePath; + private Password keystorePassword; + private Password truststorePassword; + + @BeforeEach + public void before() throws Exception { + CertStores serverCertStores = new CertStores(true, "localhost"); + sslConfig = serverCertStores.getUntrustingConfig(); + keystorePath = sslConfig.get("ssl.keystore.location").toString(); + truststorePath = sslConfig.get("ssl.truststore.location").toString(); + keystorePassword = (Password) sslConfig.get("ssl.keystore.password"); + truststorePassword = (Password) sslConfig.get("ssl.keystore.password"); + } + @Test public void testGetOrDefault() { String existingKey = "exists"; @@ -47,13 +66,13 @@ public void testGetOrDefault() { } @Test - public void testCreateServerSideSslContextFactory() { + public void testCreateServerSideSslContextFactory() throws Exception { Map configMap = new HashMap<>(); - configMap.put("ssl.keystore.location", "/path/to/keystore"); - configMap.put("ssl.keystore.password", "123456"); - configMap.put("ssl.key.password", "123456"); - configMap.put("ssl.truststore.location", "/path/to/truststore"); - configMap.put("ssl.truststore.password", "123456"); + configMap.put("ssl.keystore.location", keystorePath); + configMap.put("ssl.keystore.password", keystorePassword.value()); + configMap.put("ssl.key.password", keystorePassword.value()); + configMap.put("ssl.truststore.location", truststorePath); + configMap.put("ssl.truststore.password", truststorePassword.value()); configMap.put("ssl.provider", "SunJSSE"); configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5"); configMap.put("ssl.secure.random.implementation", "SHA1PRNG"); @@ -69,8 +88,8 @@ public void testCreateServerSideSslContextFactory() { RestServerConfig config = RestServerConfig.forPublic(null, configMap); SslContextFactory.Server ssl = SSLUtils.createServerSideSslContextFactory(config); - assertEquals("file:///path/to/keystore", ssl.getKeyStorePath()); - assertEquals("file:///path/to/truststore", ssl.getTrustStorePath()); + assertEquals("file://" + keystorePath, ssl.getKeyStorePath()); + assertEquals("file://" + truststorePath, ssl.getTrustStorePath()); assertEquals("SunJSSE", ssl.getProvider()); assertArrayEquals(new String[] {"SSL_RSA_WITH_RC4_128_SHA", "SSL_RSA_WITH_RC4_128_MD5"}, ssl.getIncludeCipherSuites()); assertEquals("SHA1PRNG", ssl.getSecureRandomAlgorithm()); @@ -87,11 +106,11 @@ public void testCreateServerSideSslContextFactory() { @Test public void testCreateClientSideSslContextFactory() { Map configMap = new HashMap<>(); - configMap.put("ssl.keystore.location", "/path/to/keystore"); - configMap.put("ssl.keystore.password", "123456"); - configMap.put("ssl.key.password", "123456"); - configMap.put("ssl.truststore.location", "/path/to/truststore"); - configMap.put("ssl.truststore.password", "123456"); + configMap.put("ssl.keystore.location", keystorePath); + configMap.put("ssl.keystore.password", keystorePassword.value()); + configMap.put("ssl.key.password", keystorePassword.value()); + configMap.put("ssl.truststore.location", truststorePath); + configMap.put("ssl.truststore.password", truststorePassword.value()); configMap.put("ssl.provider", "SunJSSE"); configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5"); configMap.put("ssl.secure.random.implementation", "SHA1PRNG"); @@ -107,8 +126,8 @@ public void testCreateClientSideSslContextFactory() { RestServerConfig config = RestServerConfig.forPublic(null, configMap); SslContextFactory.Client ssl = SSLUtils.createClientSideSslContextFactory(config); - assertEquals("file:///path/to/keystore", ssl.getKeyStorePath()); - assertEquals("file:///path/to/truststore", ssl.getTrustStorePath()); + assertEquals("file://" + keystorePath, ssl.getKeyStorePath()); + assertEquals("file://" + truststorePath, ssl.getTrustStorePath()); assertEquals("SunJSSE", ssl.getProvider()); assertArrayEquals(new String[] {"SSL_RSA_WITH_RC4_128_SHA", "SSL_RSA_WITH_RC4_128_MD5"}, ssl.getIncludeCipherSuites()); assertEquals("SHA1PRNG", ssl.getSecureRandomAlgorithm()); @@ -123,11 +142,11 @@ public void testCreateClientSideSslContextFactory() { @Test public void testCreateServerSideSslContextFactoryDefaultValues() { Map configMap = new HashMap<>(); - configMap.put("ssl.keystore.location", "/path/to/keystore"); - configMap.put("ssl.keystore.password", "123456"); - configMap.put("ssl.key.password", "123456"); - configMap.put("ssl.truststore.location", "/path/to/truststore"); - configMap.put("ssl.truststore.password", "123456"); + configMap.put("ssl.keystore.location", keystorePath); + configMap.put("ssl.keystore.password", keystorePassword.value()); + configMap.put("ssl.key.password", keystorePassword.value()); + configMap.put("ssl.truststore.location", truststorePath); + configMap.put("ssl.truststore.password", truststorePassword.value()); configMap.put("ssl.provider", "SunJSSE"); configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5"); configMap.put("ssl.secure.random.implementation", "SHA1PRNG"); @@ -148,11 +167,11 @@ public void testCreateServerSideSslContextFactoryDefaultValues() { @Test public void testCreateClientSideSslContextFactoryDefaultValues() { Map configMap = new HashMap<>(); - configMap.put("ssl.keystore.location", "/path/to/keystore"); - configMap.put("ssl.keystore.password", "123456"); - configMap.put("ssl.key.password", "123456"); - configMap.put("ssl.truststore.location", "/path/to/truststore"); - configMap.put("ssl.truststore.password", "123456"); + configMap.put("ssl.keystore.location", keystorePath); + configMap.put("ssl.keystore.password", keystorePassword.value()); + configMap.put("ssl.key.password", keystorePassword.value()); + configMap.put("ssl.truststore.location", truststorePath); + configMap.put("ssl.truststore.password", truststorePassword.value()); configMap.put("ssl.provider", "SunJSSE"); configMap.put("ssl.cipher.suites", "SSL_RSA_WITH_RC4_128_SHA,SSL_RSA_WITH_RC4_128_MD5"); configMap.put("ssl.secure.random.implementation", "SHA1PRNG"); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java index c30c78ad7160a..8dc22edb86309 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/ConnectAssertions.java @@ -37,7 +37,7 @@ import java.util.function.Predicate; import java.util.stream.Collectors; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.apache.kafka.test.TestUtils.waitForCondition; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java index e76ccf9ed2c3e..b576cda56a75d 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java @@ -37,14 +37,15 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.client.api.ContentResponse; -import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.client.util.StringContentProvider; +import org.eclipse.jetty.client.Request; +import org.eclipse.jetty.client.StringRequestContent; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.Collection; import java.util.Collections; import java.util.HashSet; @@ -55,7 +56,9 @@ import java.util.Set; import java.util.stream.Collectors; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; + +import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; abstract class EmbeddedConnect { @@ -81,6 +84,10 @@ protected EmbeddedConnect( this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps, clientProps); this.maskExitProcedures = maskExitProcedures; this.httpClient = new HttpClient(); + // Necessary to prevent the rest request from timing out too early + // Before this change,ConnectWorkerIntegrationTest#testPollTimeoutExpiry() was failing + // because the request was being stopped by jetty before the framework responded + this.httpClient.setIdleTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); this.assertions = new ConnectAssertions(this); // we should keep the original class loader and set it back after connector stopped since the connector will change the class loader, // and then, the Mockito will use the unexpected class loader to generate the wrong proxy instance, which makes mock failed @@ -992,8 +999,8 @@ protected Response requestHttpMethod(String url, String body, Map headers.forEach(mutable::add)); + req.body(new StringRequestContent("application/json", body, StandardCharsets.UTF_8)); } ContentResponse res = req.send(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java index 66ce78d0d1bab..5678b97bb1314 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectStandalone.java @@ -37,7 +37,7 @@ import java.util.Properties; import java.util.Set; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.core.Response; import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG; diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 06176436a8c63..1dd01d5e0ebc9 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -70,15 +70,15 @@ versions += [ jackson: "2.16.2", jacoco: "0.8.10", javassist: "3.29.2-GA", - jetty: "9.4.56.v20240826", - jersey: "2.39.1", + jetty: "12.0.15", + jersey: "3.1.9", jline: "3.25.1", jmh: "1.37", hamcrest: "2.2", scalaLogging: "3.9.5", jaxAnnotation: "1.3.2", jaxb: "2.3.1", - jaxrs: "2.1.1", + jakartaRs: "3.1.0", jfreechart: "1.0.0", jopt: "5.0.4", jose4j: "0.9.4", @@ -159,15 +159,15 @@ libs += [ jacksonModuleScala: "com.fasterxml.jackson.module:jackson-module-scala_$versions.baseScala:$versions.jackson", jacksonJDK8Datatypes: "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:$versions.jackson", jacksonBlackbird: "com.fasterxml.jackson.module:jackson-module-blackbird:$versions.jackson", - jacksonJaxrsJsonProvider: "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:$versions.jackson", + jacksonJakartarsJsonProvider: "com.fasterxml.jackson.jakarta.rs:jackson-jakarta-rs-json-provider:$versions.jackson", jaxAnnotationApi: "javax.annotation:javax.annotation-api:$versions.jaxAnnotation", jaxbApi: "javax.xml.bind:jaxb-api:$versions.jaxb", - jaxrsApi: "javax.ws.rs:javax.ws.rs-api:$versions.jaxrs", + jakartaRsApi: "jakarta.ws.rs:jakarta.ws.rs-api:$versions.jakartaRs", javassist: "org.javassist:javassist:$versions.javassist", jettyServer: "org.eclipse.jetty:jetty-server:$versions.jetty", jettyClient: "org.eclipse.jetty:jetty-client:$versions.jetty", - jettyServlet: "org.eclipse.jetty:jetty-servlet:$versions.jetty", - jettyServlets: "org.eclipse.jetty:jetty-servlets:$versions.jetty", + jettyServlet: "org.eclipse.jetty.ee10:jetty-ee10-servlet:$versions.jetty", + jettyServlets: "org.eclipse.jetty.ee10:jetty-ee10-servlets:$versions.jetty", jerseyContainerServlet: "org.glassfish.jersey.containers:jersey-container-servlet:$versions.jersey", jerseyHk2: "org.glassfish.jersey.inject:jersey-hk2:$versions.jersey", jline: "org.jline:jline:$versions.jline", diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index b5a3c9bd96e9e..8b6ca9f522c70 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -40,6 +40,13 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read + + + + + diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java index 91dd6ee1d9787..fb3a7d8162ffd 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java @@ -49,7 +49,7 @@ import java.util.List; import java.util.Map; -import javax.ws.rs.core.UriBuilder; +import jakarta.ws.rs.core.UriBuilder; import static net.sourceforge.argparse4j.impl.Arguments.store; import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java index 09f5d5bb3ebb9..b8b209d974ee6 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/agent/AgentRestResource.java @@ -25,17 +25,17 @@ import java.util.concurrent.atomic.AtomicReference; -import javax.servlet.ServletContext; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; +import jakarta.servlet.ServletContext; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; /** * The REST resource for the Agent. This describes the RPCs which the agent can accept. @@ -54,7 +54,7 @@ public class AgentRestResource { private final AtomicReference agent = new AtomicReference<>(null); - @javax.ws.rs.core.Context + @jakarta.ws.rs.core.Context private ServletContext context; public void setAgent(Agent myAgent) { diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java index 00c26adc4c4c8..84880fdd86ffc 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java @@ -63,8 +63,8 @@ import java.util.regex.Pattern; import java.util.regex.PatternSyntaxException; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.core.UriBuilder; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.UriBuilder; import static net.sourceforge.argparse4j.impl.Arguments.append; import static net.sourceforge.argparse4j.impl.Arguments.store; diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java index a65c4d26a73a1..5d0ad96e17f23 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorRestResource.java @@ -33,20 +33,20 @@ import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; -import javax.servlet.ServletContext; -import javax.ws.rs.Consumes; -import javax.ws.rs.DELETE; -import javax.ws.rs.DefaultValue; -import javax.ws.rs.GET; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.POST; -import javax.ws.rs.PUT; -import javax.ws.rs.Path; -import javax.ws.rs.PathParam; -import javax.ws.rs.Produces; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; +import jakarta.servlet.ServletContext; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.DefaultValue; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.PUT; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.PathParam; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.QueryParam; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; /** * The REST resource for the Coordinator. This describes the RPCs which the coordinator @@ -66,7 +66,7 @@ public class CoordinatorRestResource { private final AtomicReference coordinator = new AtomicReference<>(); - @javax.ws.rs.core.Context + @jakarta.ws.rs.core.Context private ServletContext context; public void setCoordinator(Coordinator myCoordinator) { diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java index 44e69ee2dc77f..108642ca1052c 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java @@ -21,20 +21,19 @@ import org.apache.kafka.trogdor.common.JsonUtil; import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; +import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider; +import org.eclipse.jetty.ee10.servlet.ServletContextHandler; +import org.eclipse.jetty.ee10.servlet.ServletHolder; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.CustomRequestLog; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.server.Slf4jRequestLogWriter; +import org.eclipse.jetty.server.handler.ContextHandlerCollection; import org.eclipse.jetty.server.handler.DefaultHandler; -import org.eclipse.jetty.server.handler.HandlerCollection; -import org.eclipse.jetty.server.handler.RequestLogHandler; import org.eclipse.jetty.server.handler.StatisticsHandler; -import org.eclipse.jetty.servlet.ServletContextHandler; -import org.eclipse.jetty.servlet.ServletHolder; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.servlet.ServletContainer; import org.slf4j.Logger; @@ -102,14 +101,13 @@ public void start(Object... resources) { context.setContextPath("/"); context.addServlet(servletHolder, "/*"); - RequestLogHandler requestLogHandler = new RequestLogHandler(); Slf4jRequestLogWriter slf4jRequestLogWriter = new Slf4jRequestLogWriter(); slf4jRequestLogWriter.setLoggerName(JsonRestServer.class.getCanonicalName()); CustomRequestLog requestLog = new CustomRequestLog(slf4jRequestLogWriter, CustomRequestLog.EXTENDED_NCSA_FORMAT + " %{ms}T"); - requestLogHandler.setRequestLog(requestLog); + jettyServer.setRequestLog(requestLog); - HandlerCollection handlers = new HandlerCollection(); - handlers.setHandlers(new Handler[]{context, new DefaultHandler(), requestLogHandler}); + ContextHandlerCollection handlers = new ContextHandlerCollection(); + handlers.setHandlers(new Handler[]{context, new DefaultHandler()}); StatisticsHandler statsHandler = new StatisticsHandler(); statsHandler.setHandler(handlers); jettyServer.setHandler(statsHandler); diff --git a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java index 2c1a046b480bf..db747030abe47 100644 --- a/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java +++ b/trogdor/src/main/java/org/apache/kafka/trogdor/rest/RestExceptionMapper.java @@ -25,9 +25,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.core.Response; -import javax.ws.rs.ext.ExceptionMapper; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.ext.ExceptionMapper; public class RestExceptionMapper implements ExceptionMapper { private static final Logger log = LoggerFactory.getLogger(RestExceptionMapper.class); diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java index 849e2713d9e85..313a5db743741 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/coordinator/CoordinatorTest.java @@ -62,7 +62,7 @@ import java.util.List; import java.util.Optional; -import javax.ws.rs.NotFoundException; +import jakarta.ws.rs.NotFoundException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; diff --git a/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java b/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java index e446fec68b701..c0f1248ce2257 100644 --- a/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java +++ b/trogdor/src/test/java/org/apache/kafka/trogdor/rest/RestExceptionMapperTest.java @@ -26,8 +26,8 @@ import org.junit.jupiter.api.Test; -import javax.ws.rs.NotFoundException; -import javax.ws.rs.core.Response; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.core.Response; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows;