diff --git a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala index 298e4f167..1722ad8a8 100644 --- a/app/org/thp/cortex/services/DockerJobRunnerSrv.scala +++ b/app/org/thp/cortex/services/DockerJobRunnerSrv.scala @@ -1,15 +1,13 @@ package org.thp.cortex.services import akka.actor.ActorSystem -import com.spotify.docker.client.DockerClient.LogsParam -import com.spotify.docker.client.messages.HostConfig.Bind -import com.spotify.docker.client.messages.{ContainerConfig, HostConfig} -import com.spotify.docker.client.{DefaultDockerClient, DockerClient} +import org.thp.cortex.util.docker.{DockerClient => DockerJavaClient} import play.api.libs.json.Json import play.api.{Configuration, Logger} import java.nio.charset.StandardCharsets import java.nio.file._ +import java.util.concurrent.TimeUnit import javax.inject.{Inject, Singleton} import scala.concurrent.ExecutionContext import scala.concurrent.duration.FiniteDuration @@ -17,8 +15,7 @@ import scala.util.Try @Singleton class DockerJobRunnerSrv( - client: DockerClient, - config: Configuration, + javaClient: DockerJavaClient, autoUpdate: Boolean, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, @@ -28,17 +25,7 @@ class DockerJobRunnerSrv( @Inject() def this(config: Configuration, system: ActorSystem) = this( - new DefaultDockerClient.Builder() - .apiVersion(config.getOptional[String]("docker.version").orNull) - .connectionPoolSize(config.getOptional[Int]("docker.connectionPoolSize").getOrElse(100)) - .connectTimeoutMillis(config.getOptional[Long]("docker.connectTimeoutMillis").getOrElse(5000)) - //.dockerCertificates() - .readTimeoutMillis(config.getOptional[Long]("docker.readTimeoutMillis").getOrElse(30000)) - //.registryAuthSupplier() - .uri(config.getOptional[String]("docker.uri").getOrElse("unix:///var/run/docker.sock")) - .useProxy(config.getOptional[Boolean]("docker.useProxy").getOrElse(false)) - .build(), - config, + new DockerJavaClient(config), config.getOptional[Boolean]("docker.autoUpdate").getOrElse(true), Paths.get(config.get[String]("job.directory")), Paths.get(config.get[String]("job.dockerDirectory")), @@ -50,7 +37,7 @@ class DockerJobRunnerSrv( lazy val isAvailable: Boolean = Try { logger.debug(s"Retrieve docker information ...") - logger.info(s"Docker is available:\n${client.info()}") + logger.info(s"Docker is available:\n${javaClient.info}") true }.recover { case error => @@ -58,81 +45,35 @@ class DockerJobRunnerSrv( false }.get - def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit - ec: ExecutionContext - ): Try[Unit] = { - import scala.collection.JavaConverters._ - if (autoUpdate) Try(client.pull(dockerImage)) - // ContainerConfig.builder().addVolume() - val hostConfigBuilder = HostConfig.builder() - config.getOptional[Seq[String]]("docker.container.capAdd").map(_.asJava).foreach(hostConfigBuilder.capAdd) - config.getOptional[Seq[String]]("docker.container.capDrop").map(_.asJava).foreach(hostConfigBuilder.capDrop) - config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigBuilder.cgroupParent) - config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigBuilder.cpuPeriod(_)) - config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigBuilder.cpuQuota(_)) - config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigBuilder.dns) - config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigBuilder.dnsSearch) - config.getOptional[Seq[String]]("docker.container.extraHosts").map(_.asJava).foreach(hostConfigBuilder.extraHosts) - config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigBuilder.kernelMemory(_)) - config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigBuilder.memoryReservation(_)) - config.getOptional[Long]("docker.container.memory").foreach(hostConfigBuilder.memory(_)) - config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigBuilder.memorySwap(_)) - config.getOptional[Int]("docker.container.memorySwappiness").foreach(hostConfigBuilder.memorySwappiness(_)) - config.getOptional[String]("docker.container.networkMode").foreach(hostConfigBuilder.networkMode) - config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigBuilder.privileged(_)) - hostConfigBuilder.appendBinds( - Bind - .from(dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString) - .to("/job") - .readOnly(false) - .build() - ) - val cacertsFile = jobDirectory.resolve("input").resolve("cacerts") - val containerConfigBuilder = ContainerConfig - .builder() - .hostConfig(hostConfigBuilder.build()) - .image(dockerImage) - .cmd("/job") + private def generateErrorOutput(containerId: String, f: Path) = { + logger.warn(s"the runner didn't generate any output file $f") + for { + output <- javaClient.getLogs(containerId) + report = Json.obj("success" -> false, "errorMessage" -> output) + _ <- Try(Files.write(f, report.toString.getBytes(StandardCharsets.UTF_8))) + } yield report + } - val containerConfig = - if (Files.exists(cacertsFile)) containerConfigBuilder.env(s"REQUESTS_CA_BUNDLE=/job/input/cacerts").build() - else containerConfigBuilder.build() - val containerCreation = client.createContainer(containerConfig) - // Option(containerCreation.warnings()).flatMap(_.asScala).foreach(logger.warn) + def run(jobDirectory: Path, dockerImage: String, timeout: Option[FiniteDuration])(implicit executionContext: ExecutionContext): Try[Unit] = { + val to = timeout.getOrElse(FiniteDuration(5000, TimeUnit.SECONDS)) - logger.debug(s"Container configuration: $containerConfig") - logger.info( - s"Execute container ${containerCreation.id()}\n" + - s" timeout: ${timeout.fold("none")(_.toString)}\n" + - s" image : $dockerImage\n" + - s" volume : ${jobDirectory.toAbsolutePath}:/job" + - Option(containerConfig.env()).fold("")(_.asScala.map("\n env : " + _).mkString) - ) - - val timeoutSched = timeout.map(to => - system.scheduler.scheduleOnce(to) { - logger.info("Timeout reached, stopping the container") - client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) - } - ) - val execution = Try { - client.startContainer(containerCreation.id()) - client.waitContainer(containerCreation.id()) - () - } - timeoutSched.foreach(_.cancel()) - val outputFile = jobDirectory.resolve("output").resolve("output.json") - if (!Files.exists(outputFile) || Files.size(outputFile) == 0) { - logger.warn(s"The worker didn't generate output file.") - val output = Try(client.logs(containerCreation.id(), LogsParam.stdout(), LogsParam.stderr()).readFully()) - .fold(e => s"Container logs can't be read (${e.getMessage})", identity) - val message = execution.fold(e => s"Docker creation error: ${e.getMessage}\n$output", _ => output) + if (autoUpdate) Try(javaClient.pullImage(dockerImage)) - val report = Json.obj("success" -> false, "errorMessage" -> message) - Files.write(outputFile, report.toString.getBytes(StandardCharsets.UTF_8)) - } - client.removeContainer(containerCreation.id(), DockerClient.RemoveContainerParam.forceKill()) - execution + for { + containerId <- javaClient.prepare(dockerImage, jobDirectory, jobBaseDirectory, dockerJobBaseDirectory, to) + timeoutScheduled = timeout.map(to => + system.scheduler.scheduleOnce(to) { + logger.info("Timeout reached, stopping the container") + javaClient.clean(containerId) + } + ) + _ <- javaClient.execute(containerId) + _ = timeoutScheduled.foreach(_.cancel()) + outputFile <- Try(jobDirectory.resolve("output").resolve("output.json")) + isError = Files.notExists(outputFile) || Files.size(outputFile) == 0 || Files.isDirectory(outputFile) + _ = if (isError) generateErrorOutput(containerId, outputFile).toOption else None + _ <- javaClient.clean(containerId) + } yield () } } diff --git a/app/org/thp/cortex/util/docker/DockerClient.scala b/app/org/thp/cortex/util/docker/DockerClient.scala new file mode 100644 index 000000000..e7df3f9e5 --- /dev/null +++ b/app/org/thp/cortex/util/docker/DockerClient.scala @@ -0,0 +1,167 @@ +package org.thp.cortex.util.docker + +import com.github.dockerjava.api.model._ +import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientConfig, DockerClientImpl} +import com.github.dockerjava.transport.DockerHttpClient +import com.github.dockerjava.zerodep.ZerodepDockerHttpClient +import play.api.{Configuration, Logger} + +import java.nio.file.{Files, Path} +import java.time.Duration +import java.util.concurrent.{Executors, TimeUnit} +import scala.concurrent.blocking +import scala.concurrent.duration.FiniteDuration +import scala.jdk.CollectionConverters._ +import scala.util.Try + +class DockerClient(config: Configuration) { + private lazy val logger: Logger = Logger(getClass.getName) + private lazy val (dockerConf, httpClient) = getHttpClient + private lazy val underlyingClient = DockerClientImpl.getInstance(dockerConf, httpClient) + + def execute(containerId: String): Try[Int] = + Try { + val startContainerCmd = underlyingClient.startContainerCmd(containerId) + startContainerCmd.exec() + val waitResult = underlyingClient + .waitContainerCmd(containerId) + .start() + .awaitStatusCode() + logger.info(s"container $containerId started and awaited with code: $waitResult") + + waitResult + } + + def prepare(image: String, jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path, timeout: FiniteDuration): Try[String] = Try { + logger.info(s"image $image pull result: ${pullImage(image)}") + val containerCmd = underlyingClient + .createContainerCmd(image) + .withHostConfig(configure(jobDirectory, jobBaseDirectory, dockerJobBaseDirectory)) + if (Files.exists(jobDirectory.resolve("input").resolve("cacerts"))) + containerCmd.withEnv(s"REQUESTS_CA_BUNDLE=/job/input/cacerts") + val containerResponse = containerCmd.exec() + logger.info( + s"about to start container ${containerResponse.getId}\n" + + s" timeout: ${timeout.toString}\n" + + s" image : $image\n" + + s" volumes : ${jobDirectory.toAbsolutePath}" + ) + if (containerResponse.getWarnings.nonEmpty) logger.warn(s"${containerResponse.getWarnings.mkString(", ")}") + scheduleContainerTimeout(containerResponse.getId, timeout) + + containerResponse.getId + } + + private def configure(jobDirectory: Path, jobBaseDirectory: Path, dockerJobBaseDirectory: Path): HostConfig = { + val hostConfigMut = HostConfig + .newHostConfig() + .withBinds( + Seq( + new Bind( + dockerJobBaseDirectory.resolve(jobBaseDirectory.relativize(jobDirectory)).toAbsolutePath.toString, + new Volume(s"/job"), + AccessMode.rw + ) + ): _* + ) + + config.getOptional[Seq[String]]("docker.container.capAdd").map(_.map(Capability.valueOf)).foreach(hostConfigMut.withCapAdd(_: _*)) + config.getOptional[Seq[String]]("docker.container.capDrop").map(_.map(Capability.valueOf)).foreach(hostConfigMut.withCapDrop(_: _*)) + config.getOptional[String]("docker.container.cgroupParent").foreach(hostConfigMut.withCgroupParent) + config.getOptional[Long]("docker.container.cpuPeriod").foreach(hostConfigMut.withCpuPeriod(_)) + config.getOptional[Long]("docker.container.cpuQuota").foreach(hostConfigMut.withCpuQuota(_)) + config.getOptional[Seq[String]]("docker.container.dns").map(_.asJava).foreach(hostConfigMut.withDns) + config.getOptional[Seq[String]]("docker.container.dnsSearch").map(_.asJava).foreach(hostConfigMut.withDnsSearch) + config.getOptional[Seq[String]]("docker.container.extraHosts").foreach(l => hostConfigMut.withExtraHosts(l: _*)) + config.getOptional[Long]("docker.container.kernelMemory").foreach(hostConfigMut.withKernelMemory(_)) + config.getOptional[Long]("docker.container.memoryReservation").foreach(hostConfigMut.withMemoryReservation(_)) + config.getOptional[Long]("docker.container.memory").foreach(hostConfigMut.withMemory(_)) + config.getOptional[Long]("docker.container.memorySwap").foreach(hostConfigMut.withMemorySwap(_)) + config.getOptional[Long]("docker.container.memorySwappiness").foreach(hostConfigMut.withMemorySwappiness(_)) + config.getOptional[String]("docker.container.networkMode").foreach(hostConfigMut.withNetworkMode) + config.getOptional[Boolean]("docker.container.privileged").foreach(hostConfigMut.withPrivileged(_)) + + hostConfigMut + } + + def info: Info = underlyingClient.infoCmd().exec() + def pullImage(image: String): Boolean = blocking { + val pullImageResultCbk = underlyingClient // Blocking + .pullImageCmd(image) + .start() + .awaitCompletion() + val timeout = config.get[FiniteDuration]("docker.pullImageTimeout") + + pullImageResultCbk.awaitCompletion(timeout.toMillis, TimeUnit.MILLISECONDS) + } + + def clean(containerId: String): Try[Unit] = Try { + underlyingClient + .killContainerCmd(containerId) + .exec() + underlyingClient + .removeContainerCmd(containerId) + .withForce(true) + .exec() + logger.info(s"removed container $containerId") + } + + def getLogs(containerId: String): Try[String] = Try { + val stringBuilder = new StringBuilder() + val callback = new DockerLogsStringBuilder(stringBuilder) + underlyingClient + .logContainerCmd(containerId) + .withStdErr(true) + .withStdOut(true) + .withFollowStream(true) + .withTailAll() + .exec(callback) + .awaitCompletion() + + callback.builder.toString + } + + private def scheduleContainerTimeout(containerId: String, timeout: FiniteDuration) = + Executors + .newSingleThreadScheduledExecutor() + .schedule( + () => { + logger.info(s"timeout $timeout reached, stopping container $containerId}") + underlyingClient.removeContainerCmd(containerId).withForce(true).exec() + }, + timeout.length, + timeout.unit + ) + + private def getHttpClient: (DockerClientConfig, DockerHttpClient) = { + val dockerConf = getBaseConfig + val dockerClient = new ZerodepDockerHttpClient.Builder() + .dockerHost(dockerConf.getDockerHost) + .sslConfig(dockerConf.getSSLConfig) + .maxConnections(if (config.has("docker.httpClient.maxConnections")) config.get[Int]("docker.httpClient.maxConnections") else 100) + .connectionTimeout( + if (config.has("docker.httpClient.connectionTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.connectionTimeout")) + else Duration.ofSeconds(30) + ) + .responseTimeout( + if (config.has("docker.httpClient.responseTimeout")) Duration.ofMillis(config.get[Long]("docker.httpClient.responseTimeout")) + else Duration.ofSeconds(45) + ) + .build() + + (dockerConf, dockerClient) + } + + private def getBaseConfig: DockerClientConfig = { + val confBuilder = DefaultDockerClientConfig.createDefaultConfigBuilder() + config.getOptional[String]("docker.host").foreach(confBuilder.withDockerHost) + config.getOptional[Boolean]("docker.tlsVerify").foreach(confBuilder.withDockerTlsVerify(_)) + config.getOptional[String]("docker.certPath").foreach(confBuilder.withDockerCertPath) + config.getOptional[String]("docker.registry.user").foreach(confBuilder.withRegistryUsername) + config.getOptional[String]("docker.registry.password").foreach(confBuilder.withRegistryPassword) + config.getOptional[String]("docker.registry.email").foreach(confBuilder.withRegistryEmail) + config.getOptional[String]("docker.registry.url").foreach(confBuilder.withRegistryUrl) + + confBuilder.build() + } +} diff --git a/app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala b/app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala new file mode 100644 index 000000000..c5fda9bc6 --- /dev/null +++ b/app/org/thp/cortex/util/docker/DockerLogsStringBuilder.scala @@ -0,0 +1,11 @@ +package org.thp.cortex.util.docker + +import com.github.dockerjava.api.async.ResultCallback +import com.github.dockerjava.api.model.Frame + +class DockerLogsStringBuilder(var builder: StringBuilder) extends ResultCallback.Adapter[Frame] { + override def onNext(item: Frame): Unit = { + builder.append(new String(item.getPayload)) + super.onNext(item) + } +} diff --git a/build.sbt b/build.sbt index 94b60e4ae..e9f7c3211 100644 --- a/build.sbt +++ b/build.sbt @@ -29,6 +29,8 @@ lazy val cortex = (project in file(".")) Dependencies.reflections, Dependencies.zip4j, Dependencies.dockerClient, + Dependencies.dockerJavaClient, + Dependencies.dockerJavaTransport, Dependencies.akkaCluster, Dependencies.akkaClusterTyped ), diff --git a/conf/application.sample b/conf/application.sample index 72d46febb..ecb9c2cc9 100644 --- a/conf/application.sample +++ b/conf/application.sample @@ -219,4 +219,43 @@ responder { # port = 3128 # } +# Docker +docker { + host = "tcp://docker.somewhere.tld:2376" + tlsVerify = false + certPath = "/home/user/.docker" + registry { + user = "username" + password = "pwdReg" + email = "user@docker.com" + url = "https://www.docker-registry.com" + } + httpClient { + maxConnections = 100 + connectionTimeout = 30000 # millis + responseTimeout = 45000 + } + container { + capAdd = ["ALL"] + capDrop = ["NET_ADMIN", "SYS_ADMIN"] + cgroupParent = "m-executor-abcd" + privileged = false + + dns = ["8.8.8.8", "9.9.9.9"] + dnsSearch = ["dc1.example.com", "dc2.example.com"] + extraHosts = ["somehost=162.242.195.82", "otherhost=50.31.209.229", "myhostv6=::1"] + networkMode = "host" + + cpuPeriod = 100000 + cpuQuota = 50000 + kernelMemory = 2147483648 + memoryReservation = 1024 + memory = 4294967296 + memorySwap = 1073741824 + memorySwappiness = 0 + } + autoUpdate = false + pullImageTimeout = 10 minutes +} + # It's the end my friend. Happy hunting! diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7d07b526f..490bfaee0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -1,7 +1,8 @@ -import sbt._ +import sbt.* object Dependencies { - val scalaVersion = "2.12.16" + val scalaVersion = "2.12.16" + val dockerJavaVersion = "3.4.0" object Play { val version = play.core.PlayVersion.current @@ -17,10 +18,12 @@ object Dependencies { val scalaGuice = "net.codingwell" %% "scala-guice" % "5.1.1" - val reflections = "org.reflections" % "reflections" % "0.10.2" - val zip4j = "net.lingala.zip4j" % "zip4j" % "2.11.5" - val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6" - val dockerClient = "com.spotify" % "docker-client" % "8.16.0" - val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion - val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion + val reflections = "org.reflections" % "reflections" % "0.10.2" + val zip4j = "net.lingala.zip4j" % "zip4j" % "2.11.5" + val elastic4play = "org.thehive-project" %% "elastic4play" % "1.13.6" + val dockerClient = "com.spotify" % "docker-client" % "8.16.0" + val dockerJavaClient = "com.github.docker-java" % "docker-java" % dockerJavaVersion + val dockerJavaTransport = "com.github.docker-java" % "docker-java-transport-zerodep" % dockerJavaVersion + val akkaCluster = "com.typesafe.akka" %% "akka-cluster" % play.core.PlayVersion.akkaVersion + val akkaClusterTyped = "com.typesafe.akka" %% "akka-cluster-typed" % play.core.PlayVersion.akkaVersion }