From 790ca3340102dd548918480eddc3832424ed5ee5 Mon Sep 17 00:00:00 2001 From: "alexandre.teilhet" Date: Fri, 4 Oct 2024 17:58:46 +0200 Subject: [PATCH 1/3] [DL-717] replace spotify docker client for java mainstream one --- .../cortex/services/DockerJobRunnerSrv.scala | 205 ++++++++++-------- .../thp/cortex/util/docker/DockerClient.scala | 167 ++++++++++++++ .../util/docker/DockerLogsStringBuilder.scala | 11 + build.sbt | 2 + conf/application.sample | 37 ++++ project/Dependencies.scala | 19 +- 6 files changed, 344 insertions(+), 97 deletions(-) create mode 100644 app/org/thp/cortex/util/docker/DockerClient.scala create mode 100644 app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index 298e4f167..45bbef979 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -1,15 +1,17 @@ package org.thp.cortex.services import akka.actor.ActorSystem -import com.spotify.docker.client.DockerClient.LogsParam -import com.spotify.docker.client.messages.HostConfig.Bind -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} -import com.spotify.docker.client.{DefaultDockerClient, DockerClient} +//import com.spotify.docker.client.DockerClient.LogsParam +//import com.spotify.docker.client.messages.HostConfig.Bind +//import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} +//import com.spotify.docker.client.{DefaultDockerClient, DockerClient} +import org.thp.cortex.util.docker.{DockerClient => DockerJavaClient} import play.api.libs.json.Json import play.api.{Configuration, Logger} import java.nio.charset.StandardCharsets import java.nio.file._ +import java.util.concurrent.TimeUnit import javax.inject.{Inject, Singleton} import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration @@ -17,8 +19,7 @@ import scala.util.Try @Singleton class DockerJobRunnerSrv( - client: DockerClient, - config: Configuration, + javaClient: DockerJavaClient, autoUpdate: Boolean, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, @@ -28,17 +29,17 @@ class DockerJobRunnerSrv( @Inject() def this(config: Configuration, system: ActorSystem) = this( - new DefaultDockerClient.Builder() - .apiVersion(config.getOptional[String]("docker.version").orNull) - .connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100)) - .connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000)) - //.dockerCertificates() - .readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000)) - //.registryAuthSupplier() - .uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock")) - .useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false)) - .build(), - config, +// new DefaultDockerClient.Builder() +// .apiVersion(config.getOptional[String]("docker.version").orNull) +// .connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100)) +// .connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000)) +// //.dockerCertificates() +// .readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000)) +// //.registryAuthSupplier() +// .uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock")) +// .useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false)) +// .build(), + new DockerJavaClient(config), config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true), Paths.get(config.get[String]("job.directory")), Paths.get(config.get[String]("job.dockerDirectory")), @@ -50,7 +51,7 @@ class DockerJobRunnerSrv( lazy val isAvailable: Boolean = Try { logger.debug(s"Retrieve docker information ...") - logger.info(s"Docker is available:\n${client.info()}") + logger.info(s"Docker is available:\n${javaClient.info}") true }.recover { case error => @@ -58,81 +59,107 @@ class DockerJobRunnerSrv( false }.get - def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit - ec: ExecutionContext - ): Try[Unit] = { - import scala.collection.JavaConverters._ - if (autoUpdate) Try(client.pull(dockerImage)) - // ContainerConfig.builder().addVolume() - val hostConfigBuilder = HostConfig.builder() - config.getOptional[Seq[String]]("docker.container.capAdd").map(_.asJava).foreach(hostConfigBuilder.capAdd) - config.getOptional[Seq[String]]("docker.container.capDrop").map(_.asJava).foreach(hostConfigBuilder.capDrop) - config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigBuilder.cgroupParent) - config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigBuilder.cpuPeriod(_)) - config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigBuilder.cpuQuota(_)) - config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigBuilder.dns) - config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigBuilder.dnsSearch) - config.getOptional[Seq[String]]("docker.container.extraHosts").map(_.asJava).foreach(hostConfigBuilder.extraHosts) - config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigBuilder.kernelMemory(_)) - config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigBuilder.memoryReservation(_)) - config.getOptional[Long]("docker.container.memory").foreach(hostConfigBuilder.memory(_)) - config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigBuilder.memorySwap(_)) - config.getOptional[Int]("docker.container.memorySwappiness").foreach(hostConfigBuilder.memorySwappiness(_)) - config.getOptional[String]("docker.container.networkMode").foreach(hostConfigBuilder.networkMode) - config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigBuilder.privileged(_)) - hostConfigBuilder.appendBinds( - Bind - .from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString) - .to("/job") - .readOnly(false) - .build() - ) - val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") - val containerConfigBuilder = ContainerConfig - .builder() - .hostConfig(hostConfigBuilder.build()) - .image(dockerImage) - .cmd("/job") + private def generateErrorOutput(containerId: String, f: Path) = { + logger.warn(s"the runner didn't generate any output file $f") + for { + output <- javaClient.getLogs(containerId) + report = Json.obj("success" -> false, "errorMessage" -> output) + _ <- Try(Files.write(f, report.toString.getBytes(StandardCharsets.UTF_8))) + } yield report + } - val containerConfig = - if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build() - else containerConfigBuilder.build() - val containerCreation = client.createContainer(containerConfig) - // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) + def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit executionContext: ExecutionContext): Try[Unit] = { + val to = timeout.getOrElse(FiniteDuration(5000, TimeUnit.SECONDS)) - logger.debug(s"Container configuration: $containerConfig") - logger.info( - s"Execute container ${containerCreation.id()}\n" + - s" timeout: ${timeout.fold("none")(_.toString)}\n" + - s" image : $dockerImage\n" + - s" volume : ${jobDirectory.toAbsolutePath}:/job" + - Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString) - ) + if (autoUpdate) Try(javaClient.pullImage(dockerImage, to)) - val timeoutSched = timeout.map(to => - system.scheduler.scheduleOnce(to) { - logger.info("Timeout reached, stopping the container") - client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) - } - ) - val execution = Try { - client.startContainer(containerCreation.id()) - client.waitContainer(containerCreation.id()) - () - } - timeoutSched.foreach(_.cancel()) - val outputFile = jobDirectory.resolve("output").resolve("output.json") - if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { - logger.warn(s"The worker didn't generate output file.") - val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) - .fold(e => s"Container logs can't be read (${e.getMessage})", identity) - val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) + for { + containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to) + timeoutScheduled = timeout.map(to => + system.scheduler.scheduleOnce(to) { + logger.info("Timeout reached, stopping the container") + javaClient.clean(containerId) + } + ) + _ <- javaClient.execute(containerId) + _ = timeoutScheduled.foreach(_.cancel()) + outputFile <- Try(jobDirectory.resolve("output").resolve("output.json")) + isError = Files.notExists(outputFile) || Files.size(outputFile) == 0 || Files.isDirectory(outputFile) + _ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None + _ <- javaClient.clean(containerId) + } yield () + + // import scala.collection.JavaConverters._ + // ContainerConfig.builder().addVolume() +// val hostConfigBuilder = HostConfig.builder() +// config.getOptional[Seq[String]]("docker.container.capAdd").map(_.asJava).foreach(hostConfigBuilder.capAdd) +// config.getOptional[Seq[String]]("docker.container.capDrop").map(_.asJava).foreach(hostConfigBuilder.capDrop) +// config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigBuilder.cgroupParent) +// config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigBuilder.cpuPeriod(_)) +// config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigBuilder.cpuQuota(_)) +// config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigBuilder.dns) +// config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigBuilder.dnsSearch) +// config.getOptional[Seq[String]]("docker.container.extraHosts").map(_.asJava).foreach(hostConfigBuilder.extraHosts) +// config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigBuilder.kernelMemory(_)) +// config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigBuilder.memoryReservation(_)) +// config.getOptional[Long]("docker.container.memory").foreach(hostConfigBuilder.memory(_)) +// config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigBuilder.memorySwap(_)) +// config.getOptional[Int]("docker.container.memorySwappiness").foreach(hostConfigBuilder.memorySwappiness(_)) +// config.getOptional[String]("docker.container.networkMode").foreach(hostConfigBuilder.networkMode) +// config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigBuilder.privileged(_)) +// hostConfigBuilder.appendBinds( +// Bind +// .from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString) +// .to("/job") +// .readOnly(false) +// .build() +// ) +// val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") +// val containerConfigBuilder = ContainerConfig +// .builder() +// .hostConfig(hostConfigBuilder.build()) +// .image(dockerImage) +// .cmd("/job") - val report = Json.obj("success" -> false, "errorMessage" -> message) - Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) - } - client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) - execution +// val containerConfig = +// if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build() +// else containerConfigBuilder.build() +// val containerCreation = client.createContainer(containerConfig) +// // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) +// +// logger.debug(s"Container configuration: $containerConfig") +// logger.info( +// s"Execute container ${containerCreation.id()}\n" + +// s" timeout: ${timeout.fold("none")(_.toString)}\n" + +// s" image : $dockerImage\n" + +// s" volume : ${jobDirectory.toAbsolutePath}:/job" + +// Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString) +// ) +// +// val timeoutSched = timeout.map(to => +// system.scheduler.scheduleOnce(to) { +// logger.info("Timeout reached, stopping the container") +// client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) +// } +// ) +// val execution = Try { +// client.startContainer(containerCreation.id()) +// client.waitContainer(containerCreation.id()) +// () +// } +// timeoutSched.foreach(_.cancel()) +// val outputFile = jobDirectory.resolve("output").resolve("output.json") +// if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { +// logger.warn(s"The worker didn't generate output file.") +// val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) +// .fold(e => s"Container logs can't be read (${e.getMessage})", identity) +// val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) +// +// val report = Json.obj("success" -> false, "errorMessage" -> message) +// Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) +// } +// client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) +// execution } } diff --git a/app/org/thp/cortex/util/docker/DockerClient.scala b/app/org/thp/cortex/util/docker/DockerClient.scala new file mode 100644 index 000000000..f3e6deee5 --- /dev/null +++ b/app/org/thp/cortex/util/docker/DockerClient.scala @@ -0,0 +1,167 @@ +package org.thp.cortex.util.docker + +import com.github.dockerjava.api.model._ +import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientConfig, DockerClientImpl} +import com.github.dockerjava.transport.DockerHttpClient +import com.github.dockerjava.zerodep.ZerodepDockerHttpClient +import play.api.{Configuration, Logger} + +import java.nio.file.{Files, Path} +import java.time.Duration +import java.util.concurrent.Executors +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ +import scala.util.Try + +class DockerClient(config: Configuration) { + private lazy val logger: Logger = Logger(getClass.getName) + private lazy val (dockerConf, httpClient) = getHttpClient + private lazy val underlyingClient = DockerClientImpl.getInstance(dockerConf, httpClient) + + def execute(containerId: String): Try[String] = Try { + val startContainerCmd = underlyingClient.startContainerCmd(containerId) + startContainerCmd.exec() + val waitResult = underlyingClient + .waitContainerCmd(containerId) + .start() + .awaitStatusCode() + logger.info(s"container $containerId started and awaited with code: $waitResult") + + containerId + } + + def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] = Try { + logger.info(s"image $image pull result: ${pullImage(image, timeout)}") + val containerCmd = underlyingClient + .createContainerCmd(image) + .withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory)) + if (Files.exists(jobDirectory.resolve("input").resolve("cacerts"))) + containerCmd.withEnv(s"REQUESTS_CA_BUNDLE=/job/input/cacerts") + val containerResponse = containerCmd.exec() + logger.info( + s"about to start container ${containerResponse.getId}\n" + + s" timeout: ${timeout.toString}\n" + + s" image : $image\n" + + s" volumes : ${jobDirectory.toAbsolutePath}" + ) + if (containerResponse.getWarnings.nonEmpty) logger.warn(s"${containerResponse.getWarnings.mkString(", ")}") + scheduleContainerTimeout(containerResponse.getId, timeout) + + containerResponse.getId + } + + private def configure(jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path): HostConfig = { + val hostConfigMut = HostConfig + .newHostConfig() + .withBinds( + Seq( + new Bind( + dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString, + new Volume(s"/job"), + AccessMode.rw + ) + ): _* + ) + + config.getOptional[Seq[String]]("docker.container.capAdd").map(_.map(Capability.valueOf)).foreach(hostConfigMut.withCapAdd(_: _*)) + config.getOptional[Seq[String]]("docker.container.capDrop").map(_.map(Capability.valueOf)).foreach(hostConfigMut.withCapDrop(_: _*)) + config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigMut.withCgroupParent) + config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigMut.withCpuPeriod(_)) + config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigMut.withCpuQuota(_)) + config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigMut.withDns) + config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigMut.withDnsSearch) + config.getOptional[Seq[String]]("docker.container.extraHosts").foreach(l => hostConfigMut.withExtraHosts(l: _*)) + config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigMut.withKernelMemory(_)) + config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigMut.withMemoryReservation(_)) + config.getOptional[Long]("docker.container.memory").foreach(hostConfigMut.withMemory(_)) + config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigMut.withMemorySwap(_)) + config.getOptional[Long]("docker.container.memorySwappiness").foreach(hostConfigMut.withMemorySwappiness(_)) + config.getOptional[String]("docker.container.networkMode").foreach(hostConfigMut.withNetworkMode) + config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigMut.withPrivileged(_)) + + hostConfigMut + } + + def info: Info = underlyingClient.infoCmd().exec() + def pullImage(image: String, timeout: FiniteDuration): Boolean = { + val pullImageResultCbk = underlyingClient // Blocking + .pullImageCmd(image) + .start() + .awaitCompletion() + + pullImageResultCbk.awaitCompletion(timeout.length, timeout.unit) + } + + def clean(containerId: String): Try[Unit] = Try { + underlyingClient + .removeContainerCmd(containerId) + .withForce(true) + .exec() + logger.info(s"removed container $containerId") + } + + def getLogs(containerId: String): Try[String] = Try { + val stringBuilder = new StringBuilder() + val callback = new DockerLogsStringBuilder(stringBuilder) + underlyingClient + .logContainerCmd(containerId) + .withStdErr(true) + .withStdOut(true) + .withFollowStream(true) + .withTailAll() + .exec(callback) + .awaitCompletion() + + callback.builder.toString + } + + private def scheduleContainerTimeout(containerId: String, timeout: FiniteDuration) = + Executors + .newSingleThreadScheduledExecutor() + .schedule( + () => { + logger.info(s"timeout $timeout reached, stopping container $containerId}") + underlyingClient.removeContainerCmd(containerId).withForce(true).exec() + }, + timeout.length, + timeout.unit + ) + + private def getHttpClient: (DockerClientConfig, DockerHttpClient) = { + val dockerConf = getBaseConfig + + ( + dockerConf, + new ZerodepDockerHttpClient.Builder() + .dockerHost(dockerConf.getDockerHost) + .sslConfig(dockerConf.getSSLConfig) + .maxConnections(if (config.has("docker.httpClient.maxConnections")) config.get[Int]("docker.httpClient.maxConnections") else 100) + .connectionTimeout( + if (config.has("docker.httpClient.connectionTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.connectionTimeout")) + else Duration.ofSeconds(30) + ) + .responseTimeout( + if (config.has("docker.httpClient.responseTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.responseTimeout")) + else Duration.ofSeconds(45) + ) + .build() + ) + } + + private def getBaseConfig: DockerClientConfig = { + val confBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder() + if (config.has("docker")) { + if (config.has("docker.host")) confBuilder.withDockerHost(config.get[String]("docker.host")) + if (config.has("docker.tlsVerify")) confBuilder.withDockerTlsVerify(config.get[Boolean]("docker.tlsVerify")) + if (config.has("docker.certPath")) confBuilder.withDockerCertPath(config.get[String]("docker.certPath")) + if (config.has("docker.registry")) { + if (config.has("docker.registry.user")) confBuilder.withRegistryUsername(config.get[String]("docker.registry.user")) + if (config.has("docker.registry.password")) confBuilder.withRegistryPassword(config.get[String]("docker.registry.password")) + if (config.has("docker.registry.email")) confBuilder.withRegistryEmail(config.get[String]("docker.registry.email")) + if (config.has("docker.registry.url")) confBuilder.withRegistryUrl(config.get[String]("docker.registry.url")) + } + } + + confBuilder.build() + } +} diff --git a/app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala b/app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala new file mode 100644 index 000000000..c5fda9bc6 --- /dev/null +++ b/app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala @@ -0,0 +1,11 @@ +package org.thp.cortex.util.docker + +import com.github.dockerjava.api.async.ResultCallback +import com.github.dockerjava.api.model.Frame + +class DockerLogsStringBuilder(var builder: StringBuilder) extends ResultCallback.Adapter[Frame] { + override def onNext(item: Frame): Unit = { + builder.append(new String(item.getPayload)) + super.onNext(item) + } +} diff --git a/build.sbt b/build.sbt index 94b60e4ae..e9f7c3211 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,8 @@ lazy val cortex = (project in file(".")) Dependencies.reflections, Dependencies.zip4j, Dependencies.dockerClient, + Dependencies.dockerJavaClient, + Dependencies.dockerJavaTransport, Dependencies.akkaCluster, Dependencies.akkaClusterTyped ), diff --git a/conf/application.sample b/conf/application.sample index 72d46febb..c1b741064 100644 --- a/conf/application.sample +++ b/conf/application.sample @@ -219,4 +219,41 @@ responder { # port = 3128 # } +# Docker +docker { + host = "tcp://docker.somewhere.tld:2376" + tlsVerify = false + certPath = "/home/user/.docker" + registry { + user = "username" + password = "pwdReg" + email = "user@docker.com" + url = "https://www.docker-registry.com" + } + httpClient { + maxConnections = 100 + connectionTimeout = 30000 # millis + responseTimeout = 45000 + } + container { + capAdd = ["ALL"] + capDrop = ["NET_ADMIN", "SYS_ADMIN"] + cgroupParent = "m-executor-abcd" + privileged = false + + dns = ["8.8.8.8", "9.9.9.9"] + dnsSearch = ["dc1.example.com", "dc2.example.com"] + extraHosts = ["somehost=162.242.195.82", "otherhost=50.31.209.229", "myhostv6=::1"] + networkMode = "host" + + cpuPeriod = 100000 + cpuQuota = 50000 + kernelMemory = 2147483648 + memoryReservation = 1024 + memory = 4294967296 + memorySwap = 1073741824 + memorySwappiness = 0 + } +} + # It's the end my friend. Happy hunting! diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7d07b526f..490bfaee0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,7 +1,8 @@ -import sbt._ +import sbt.* object Dependencies { - val scalaVersion = "2.12.16" + val scalaVersion = "2.12.16" + val dockerJavaVersion = "3.4.0" object Play { val version = play.core.PlayVersion.current @@ -17,10 +18,12 @@ object Dependencies { val scalaGuice = "net.codingwell" %% "scala-guice" % "5.1.1" - val reflections = "org.reflections" % "reflections" % "0.10.2" - val zip4j = "net.lingala.zip4j" % "zip4j" % "2.11.5" - val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6" - val dockerClient = "com.spotify" % "docker-client" % "8.16.0" - val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion - val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion + val reflections = "org.reflections" % "reflections" % "0.10.2" + val zip4j = "net.lingala.zip4j" % "zip4j" % "2.11.5" + val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6" + val dockerClient = "com.spotify" % "docker-client" % "8.16.0" + val dockerJavaClient = "com.github.docker-java" % "docker-java" % dockerJavaVersion + val dockerJavaTransport = "com.github.docker-java" % "docker-java-transport-zerodep" % dockerJavaVersion + val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion + val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion } From c9be7c0c39f4e85560910bd64d256f67c3b69aad Mon Sep 17 00:00:00 2001 From: "alexandre.teilhet" Date: Mon, 7 Oct 2024 15:58:07 +0200 Subject: [PATCH 2/3] [DL-717] feedback Toom --- .../cortex/services/DockerJobRunnerSrv.scala | 88 +----------------- .../thp/cortex/util/docker/DockerClient.scala | 89 ++++++++++--------- conf/application.sample | 2 + 3 files changed, 50 insertions(+), 129 deletions(-) diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index 45bbef979..1722ad8a8 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -1,10 +1,6 @@ package org.thp.cortex.services import akka.actor.ActorSystem -//import com.spotify.docker.client.DockerClient.LogsParam -//import com.spotify.docker.client.messages.HostConfig.Bind -//import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} -//import com.spotify.docker.client.{DefaultDockerClient, DockerClient} import org.thp.cortex.util.docker.{DockerClient => DockerJavaClient} import play.api.libs.json.Json import play.api.{Configuration, Logger} @@ -29,16 +25,6 @@ class DockerJobRunnerSrv( @Inject() def this(config: Configuration, system: ActorSystem) = this( -// new DefaultDockerClient.Builder() -// .apiVersion(config.getOptional[String]("docker.version").orNull) -// .connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100)) -// .connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000)) -// //.dockerCertificates() -// .readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000)) -// //.registryAuthSupplier() -// .uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock")) -// .useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false)) -// .build(), new DockerJavaClient(config), config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true), Paths.get(config.get[String]("job.directory")), @@ -71,7 +57,7 @@ class DockerJobRunnerSrv( def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit executionContext: ExecutionContext): Try[Unit] = { val to = timeout.getOrElse(FiniteDuration(5000, TimeUnit.SECONDS)) - if (autoUpdate) Try(javaClient.pullImage(dockerImage, to)) + if (autoUpdate) Try(javaClient.pullImage(dockerImage)) for { containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to) @@ -88,78 +74,6 @@ class DockerJobRunnerSrv( _ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None _ <- javaClient.clean(containerId) } yield () - - // import scala.collection.JavaConverters._ - // ContainerConfig.builder().addVolume() -// val hostConfigBuilder = HostConfig.builder() -// config.getOptional[Seq[String]]("docker.container.capAdd").map(_.asJava).foreach(hostConfigBuilder.capAdd) -// config.getOptional[Seq[String]]("docker.container.capDrop").map(_.asJava).foreach(hostConfigBuilder.capDrop) -// config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigBuilder.cgroupParent) -// config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigBuilder.cpuPeriod(_)) -// config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigBuilder.cpuQuota(_)) -// config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigBuilder.dns) -// config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigBuilder.dnsSearch) -// config.getOptional[Seq[String]]("docker.container.extraHosts").map(_.asJava).foreach(hostConfigBuilder.extraHosts) -// config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigBuilder.kernelMemory(_)) -// config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigBuilder.memoryReservation(_)) -// config.getOptional[Long]("docker.container.memory").foreach(hostConfigBuilder.memory(_)) -// config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigBuilder.memorySwap(_)) -// config.getOptional[Int]("docker.container.memorySwappiness").foreach(hostConfigBuilder.memorySwappiness(_)) -// config.getOptional[String]("docker.container.networkMode").foreach(hostConfigBuilder.networkMode) -// config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigBuilder.privileged(_)) -// hostConfigBuilder.appendBinds( -// Bind -// .from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString) -// .to("/job") -// .readOnly(false) -// .build() -// ) -// val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") -// val containerConfigBuilder = ContainerConfig -// .builder() -// .hostConfig(hostConfigBuilder.build()) -// .image(dockerImage) -// .cmd("/job") - -// val containerConfig = -// if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build() -// else containerConfigBuilder.build() -// val containerCreation = client.createContainer(containerConfig) -// // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) -// -// logger.debug(s"Container configuration: $containerConfig") -// logger.info( -// s"Execute container ${containerCreation.id()}\n" + -// s" timeout: ${timeout.fold("none")(_.toString)}\n" + -// s" image : $dockerImage\n" + -// s" volume : ${jobDirectory.toAbsolutePath}:/job" + -// Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString) -// ) -// -// val timeoutSched = timeout.map(to => -// system.scheduler.scheduleOnce(to) { -// logger.info("Timeout reached, stopping the container") -// client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) -// } -// ) -// val execution = Try { -// client.startContainer(containerCreation.id()) -// client.waitContainer(containerCreation.id()) -// () -// } -// timeoutSched.foreach(_.cancel()) -// val outputFile = jobDirectory.resolve("output").resolve("output.json") -// if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { -// logger.warn(s"The worker didn't generate output file.") -// val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) -// .fold(e => s"Container logs can't be read (${e.getMessage})", identity) -// val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) -// -// val report = Json.obj("success" -> false, "errorMessage" -> message) -// Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) -// } -// client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) -// execution } } diff --git a/app/org/thp/cortex/util/docker/DockerClient.scala b/app/org/thp/cortex/util/docker/DockerClient.scala index f3e6deee5..0537106c2 100644 --- a/app/org/thp/cortex/util/docker/DockerClient.scala +++ b/app/org/thp/cortex/util/docker/DockerClient.scala @@ -8,7 +8,8 @@ import play.api.{Configuration, Logger} import java.nio.file.{Files, Path} import java.time.Duration -import java.util.concurrent.Executors +import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.blocking import scala.concurrent.duration.FiniteDuration import scala.jdk.CollectionConverters._ import scala.util.Try @@ -18,20 +19,26 @@ class DockerClient(config: Configuration) { private lazy val (dockerConf, httpClient) = getHttpClient private lazy val underlyingClient = DockerClientImpl.getInstance(dockerConf, httpClient) - def execute(containerId: String): Try[String] = Try { - val startContainerCmd = underlyingClient.startContainerCmd(containerId) - startContainerCmd.exec() - val waitResult = underlyingClient - .waitContainerCmd(containerId) - .start() - .awaitStatusCode() - logger.info(s"container $containerId started and awaited with code: $waitResult") - - containerId - } + def execute(containerId: String): Try[Int] = + Try { + val startContainerCmd = underlyingClient.startContainerCmd(containerId) + startContainerCmd.exec() + val waitResult = underlyingClient + .waitContainerCmd(containerId) + .start() + .awaitStatusCode() + logger.info(s"container $containerId started and awaited with code: $waitResult") + + 0 + } recover { + case e => + logger.error(s"execute container $containerId failed", e) + + 1 + } def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] = Try { - logger.info(s"image $image pull result: ${pullImage(image, timeout)}") + logger.info(s"image $image pull result: ${pullImage(image)}") val containerCmd = underlyingClient .createContainerCmd(image) .withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory)) @@ -83,16 +90,20 @@ class DockerClient(config: Configuration) { } def info: Info = underlyingClient.infoCmd().exec() - def pullImage(image: String, timeout: FiniteDuration): Boolean = { + def pullImage(image: String): Boolean = blocking { val pullImageResultCbk = underlyingClient // Blocking .pullImageCmd(image) .start() .awaitCompletion() + val timeout = config.getOptional[Long]("docker.pullImageTimeout") - pullImageResultCbk.awaitCompletion(timeout.length, timeout.unit) + pullImageResultCbk.awaitCompletion(timeout.getOrElse(10000), TimeUnit.MILLISECONDS) } def clean(containerId: String): Try[Unit] = Try { + underlyingClient + .killContainerCmd(containerId) + .exec() underlyingClient .removeContainerCmd(containerId) .withForce(true) @@ -129,38 +140,32 @@ class DockerClient(config: Configuration) { private def getHttpClient: (DockerClientConfig, DockerHttpClient) = { val dockerConf = getBaseConfig + val dockerClient = new ZerodepDockerHttpClient.Builder() + .dockerHost(dockerConf.getDockerHost) + .sslConfig(dockerConf.getSSLConfig) + .maxConnections(if (config.has("docker.httpClient.maxConnections")) config.get[Int]("docker.httpClient.maxConnections") else 100) + .connectionTimeout( + if (config.has("docker.httpClient.connectionTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.connectionTimeout")) + else Duration.ofSeconds(30) + ) + .responseTimeout( + if (config.has("docker.httpClient.responseTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.responseTimeout")) + else Duration.ofSeconds(45) + ) + .build() - ( - dockerConf, - new ZerodepDockerHttpClient.Builder() - .dockerHost(dockerConf.getDockerHost) - .sslConfig(dockerConf.getSSLConfig) - .maxConnections(if (config.has("docker.httpClient.maxConnections")) config.get[Int]("docker.httpClient.maxConnections") else 100) - .connectionTimeout( - if (config.has("docker.httpClient.connectionTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.connectionTimeout")) - else Duration.ofSeconds(30) - ) - .responseTimeout( - if (config.has("docker.httpClient.responseTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.responseTimeout")) - else Duration.ofSeconds(45) - ) - .build() - ) + (dockerConf, dockerClient) } private def getBaseConfig: DockerClientConfig = { val confBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder() - if (config.has("docker")) { - if (config.has("docker.host")) confBuilder.withDockerHost(config.get[String]("docker.host")) - if (config.has("docker.tlsVerify")) confBuilder.withDockerTlsVerify(config.get[Boolean]("docker.tlsVerify")) - if (config.has("docker.certPath")) confBuilder.withDockerCertPath(config.get[String]("docker.certPath")) - if (config.has("docker.registry")) { - if (config.has("docker.registry.user")) confBuilder.withRegistryUsername(config.get[String]("docker.registry.user")) - if (config.has("docker.registry.password")) confBuilder.withRegistryPassword(config.get[String]("docker.registry.password")) - if (config.has("docker.registry.email")) confBuilder.withRegistryEmail(config.get[String]("docker.registry.email")) - if (config.has("docker.registry.url")) confBuilder.withRegistryUrl(config.get[String]("docker.registry.url")) - } - } + config.getOptional[String]("docker.host").foreach(confBuilder.withDockerHost) + config.getOptional[Boolean]("docker.tlsVerify").foreach(confBuilder.withDockerTlsVerify(_)) + config.getOptional[String]("docker.certPath").foreach(confBuilder.withDockerCertPath) + config.getOptional[String]("docker.registry.user").foreach(confBuilder.withRegistryUsername) + config.getOptional[String]("docker.registry.password").foreach(confBuilder.withRegistryPassword) + config.getOptional[String]("docker.registry.email").foreach(confBuilder.withRegistryEmail) + config.getOptional[String]("docker.registry.url").foreach(confBuilder.withRegistryUrl) confBuilder.build() } diff --git a/conf/application.sample b/conf/application.sample index c1b741064..6b301ddcd 100644 --- a/conf/application.sample +++ b/conf/application.sample @@ -254,6 +254,8 @@ docker { memorySwap = 1073741824 memorySwappiness = 0 } + autoUpdate = false + pullImageTimeout = 10000 } # It's the end my friend. Happy hunting! From 7103b71c4c3dc34b50e023c5f891506cede55689 Mon Sep 17 00:00:00 2001 From: "alexandre.teilhet" Date: Tue, 8 Oct 2024 11:00:38 +0200 Subject: [PATCH 3/3] [DL-717] feedback Toom 2 --- app/org/thp/cortex/util/docker/DockerClient.scala | 11 +++-------- conf/application.sample | 2 +- 2 files changed, 4 insertions(+), 9 deletions(-) diff --git a/app/org/thp/cortex/util/docker/DockerClient.scala b/app/org/thp/cortex/util/docker/DockerClient.scala index 0537106c2..e7df3f9e5 100644 --- a/app/org/thp/cortex/util/docker/DockerClient.scala +++ b/app/org/thp/cortex/util/docker/DockerClient.scala @@ -29,12 +29,7 @@ class DockerClient(config: Configuration) { .awaitStatusCode() logger.info(s"container $containerId started and awaited with code: $waitResult") - 0 - } recover { - case e => - logger.error(s"execute container $containerId failed", e) - - 1 + waitResult } def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] = Try { @@ -95,9 +90,9 @@ class DockerClient(config: Configuration) { .pullImageCmd(image) .start() .awaitCompletion() - val timeout = config.getOptional[Long]("docker.pullImageTimeout") + val timeout = config.get[FiniteDuration]("docker.pullImageTimeout") - pullImageResultCbk.awaitCompletion(timeout.getOrElse(10000), TimeUnit.MILLISECONDS) + pullImageResultCbk.awaitCompletion(timeout.toMillis, TimeUnit.MILLISECONDS) } def clean(containerId: String): Try[Unit] = Try { diff --git a/conf/application.sample b/conf/application.sample index 6b301ddcd..ecb9c2cc9 100644 --- a/conf/application.sample +++ b/conf/application.sample @@ -255,7 +255,7 @@ docker { memorySwappiness = 0 } autoUpdate = false - pullImageTimeout = 10000 + pullImageTimeout = 10 minutes } # It's the end my friend. Happy hunting!