From 751385ced0a528de65f5c250ca5ce73ab0a619b4 Mon Sep 17 00:00:00 2001 From: Khalid Shakir Date: Tue, 7 Jul 2015 21:42:51 -0300 Subject: [PATCH] Updates to fix coercion errors. DSDEEPB-727 Fixed Symbol doubly qualifying the symbol names. (thx mcovarr/scottfrazer) Added `WdlType.fromRawString`, with test against respective `WdlValue.toRawString`. `DummyDataAccess` replaced with using `DataAccess` instances, with cleanup of connections. When creating in memory databases will create unique `DataAccess` instances, just like Dummy. TestSlickDatabase now prints a warning, instead of an error, when unable to connect to MySql. --- src/main/resources/reference.conf | 2 +- src/main/scala/cromwell/Main.scala | 2 + .../scala/cromwell/binding/WdlNamespace.scala | 2 + .../binding/types/WdlBooleanType.scala | 3 +- .../binding/types/WdlExpressionType.scala | 3 +- .../cromwell/binding/types/WdlFileType.scala | 2 + .../cromwell/binding/types/WdlFloatType.scala | 3 +- .../binding/types/WdlIntegerType.scala | 6 +- .../binding/types/WdlNamespaceType.scala | 1 + .../binding/types/WdlObjectType.scala | 2 + .../binding/types/WdlStringType.scala | 2 + .../cromwell/binding/types/WdlType.scala | 7 + .../cromwell/binding/values/WdlObject.scala | 2 + .../binding/values/WdlPrimitive.scala | 4 +- .../cromwell/binding/values/WdlValue.scala | 3 +- .../scala/cromwell/engine/db/DataAccess.scala | 23 +++- .../db/QueryWorkflowExecutionResult.scala | 11 -- .../engine/db/slick/SlickDataAccess.scala | 49 +++++-- .../cromwell/server/CromwellServer.scala | 11 +- .../server/DefaultWorkflowManagerSystem.scala | 3 - .../server/WorkflowManagerSystem.scala | 12 +- src/test/scala/cromwell/MainSpec.scala | 9 +- .../cromwell/SimpleWorkflowActorSpec.scala | 9 +- .../scala/cromwell/ThreeStepActorSpec.scala | 10 +- .../binding/values/WdlValueSpec.scala | 103 ++++++++++++++ .../engine/WorkflowManagerActorSpec.scala | 128 +++++++++--------- .../cromwell/engine/db/DummyDataAccess.scala | 99 -------------- .../engine/db/slick/SlickDataAccessSpec.scala | 17 ++- .../engine/db/slick/TestSlickDatabase.scala | 20 +-- .../SingleWorkflowRunnerActorSpec.scala | 19 ++- .../CromwellApiServiceIntegrationSpec.scala | 15 +- 31 files changed, 341 insertions(+), 241 deletions(-) delete mode 100644 src/main/scala/cromwell/engine/db/QueryWorkflowExecutionResult.scala create mode 100644 src/test/scala/cromwell/binding/values/WdlValueSpec.scala delete mode 100644 src/test/scala/cromwell/engine/db/DummyDataAccess.scala diff --git a/src/main/resources/reference.conf b/src/main/resources/reference.conf index e3d14336264..a878fcd549f 100644 --- a/src/main/resources/reference.conf +++ b/src/main/resources/reference.conf @@ -30,7 +30,7 @@ database { main { hsqldb { - url = "jdbc:hsqldb:mem:tempdb;shutdown=false" + url = "jdbc:hsqldb:mem:${slick.uniqueSchema};shutdown=false" driver = "org.hsqldb.jdbcDriver" slick.driver = "slick.driver.HsqldbDriver" slick.createSchema = true diff --git a/src/main/scala/cromwell/Main.scala b/src/main/scala/cromwell/Main.scala index 5a8e2e516c5..b4d7236c5f8 100644 --- a/src/main/scala/cromwell/Main.scala +++ b/src/main/scala/cromwell/Main.scala @@ -93,6 +93,8 @@ object Main extends App { case e: Exception => println("Unable to process inputs") e.printStackTrace() + } finally { + workflowManagerSystem.shutdown() // Future[Unit], but give it a shot } } diff --git a/src/main/scala/cromwell/binding/WdlNamespace.scala b/src/main/scala/cromwell/binding/WdlNamespace.scala index 1faa6a6c6ba..5e28a902d26 100644 --- a/src/main/scala/cromwell/binding/WdlNamespace.scala +++ b/src/main/scala/cromwell/binding/WdlNamespace.scala @@ -421,5 +421,7 @@ case class WdlNamespace(ast: Ast, source: WdlSource, importResolver: ImportResol } } + override def toRawString = ??? + validate() } diff --git a/src/main/scala/cromwell/binding/types/WdlBooleanType.scala b/src/main/scala/cromwell/binding/types/WdlBooleanType.scala index 7fa4b57938e..497e5a8ef7c 100644 --- a/src/main/scala/cromwell/binding/types/WdlBooleanType.scala +++ b/src/main/scala/cromwell/binding/types/WdlBooleanType.scala @@ -14,5 +14,6 @@ case object WdlBooleanType extends WdlType { case s: JsString if s.value.equalsIgnoreCase("false") => WdlBoolean.False case s: JsBoolean => WdlBoolean(s.value) } -} + override def fromRawString(rawString: String) = WdlBoolean(rawString.toBoolean) +} diff --git a/src/main/scala/cromwell/binding/types/WdlExpressionType.scala b/src/main/scala/cromwell/binding/types/WdlExpressionType.scala index 92854b0ed15..2f554e9933e 100644 --- a/src/main/scala/cromwell/binding/types/WdlExpressionType.scala +++ b/src/main/scala/cromwell/binding/types/WdlExpressionType.scala @@ -8,5 +8,6 @@ case object WdlExpressionType extends WdlType { override protected def coercion = { case s: String if s.startsWith("%expr:") => WdlExpression.fromString(s.replace("%expr:", "")) } -} + override def fromRawString(rawString: String) = WdlExpression.fromString(rawString) +} diff --git a/src/main/scala/cromwell/binding/types/WdlFileType.scala b/src/main/scala/cromwell/binding/types/WdlFileType.scala index e4e12193c2f..ae0350abea1 100644 --- a/src/main/scala/cromwell/binding/types/WdlFileType.scala +++ b/src/main/scala/cromwell/binding/types/WdlFileType.scala @@ -10,4 +10,6 @@ case object WdlFileType extends WdlType { case s: String => WdlFile(s) case s: JsString => WdlFile(s.value) } + + override def fromRawString(rawString: String) = WdlFile(rawString) } diff --git a/src/main/scala/cromwell/binding/types/WdlFloatType.scala b/src/main/scala/cromwell/binding/types/WdlFloatType.scala index 144db54bb48..9d306d3d89c 100644 --- a/src/main/scala/cromwell/binding/types/WdlFloatType.scala +++ b/src/main/scala/cromwell/binding/types/WdlFloatType.scala @@ -10,5 +10,6 @@ case object WdlFloatType extends WdlType { case d: Double => WdlFloat(d) case n: JsNumber => WdlFloat(n.value.doubleValue()) } -} + override def fromRawString(rawString: String) = WdlFloat(rawString.toFloat) +} diff --git a/src/main/scala/cromwell/binding/types/WdlIntegerType.scala b/src/main/scala/cromwell/binding/types/WdlIntegerType.scala index c46c36b13c5..05b0d063b26 100644 --- a/src/main/scala/cromwell/binding/types/WdlIntegerType.scala +++ b/src/main/scala/cromwell/binding/types/WdlIntegerType.scala @@ -8,6 +8,8 @@ case object WdlIntegerType extends WdlType { override protected def coercion = { case i: Integer => WdlInteger(i) - case n: JsNumber => WdlInteger(n.value.intValue) + case n: JsNumber => WdlInteger(n.value.intValue()) } -} \ No newline at end of file + + override def fromRawString(rawString: String) = WdlInteger(rawString.toInt) +} diff --git a/src/main/scala/cromwell/binding/types/WdlNamespaceType.scala b/src/main/scala/cromwell/binding/types/WdlNamespaceType.scala index bc88617a1ae..a652454c6cc 100644 --- a/src/main/scala/cromwell/binding/types/WdlNamespaceType.scala +++ b/src/main/scala/cromwell/binding/types/WdlNamespaceType.scala @@ -3,4 +3,5 @@ package cromwell.binding.types case object WdlNamespaceType extends WdlType { override def toWdlString: String = "Namespace" override protected def coercion = ??? + override def fromRawString(rawString: String) = ??? } diff --git a/src/main/scala/cromwell/binding/types/WdlObjectType.scala b/src/main/scala/cromwell/binding/types/WdlObjectType.scala index d6b0c40dde5..2206d050521 100644 --- a/src/main/scala/cromwell/binding/types/WdlObjectType.scala +++ b/src/main/scala/cromwell/binding/types/WdlObjectType.scala @@ -4,4 +4,6 @@ case object WdlObjectType extends WdlType { val toWdlString: String = "Object" override protected def coercion = ??? + + override def fromRawString(rawString: String) = ??? } diff --git a/src/main/scala/cromwell/binding/types/WdlStringType.scala b/src/main/scala/cromwell/binding/types/WdlStringType.scala index ebaaaaf5043..62da1befac7 100644 --- a/src/main/scala/cromwell/binding/types/WdlStringType.scala +++ b/src/main/scala/cromwell/binding/types/WdlStringType.scala @@ -10,4 +10,6 @@ case object WdlStringType extends WdlType { case s: String => WdlString(s) case s: JsString => WdlString(s.value) } + + override def fromRawString(rawString: String) = WdlString(rawString) } diff --git a/src/main/scala/cromwell/binding/types/WdlType.scala b/src/main/scala/cromwell/binding/types/WdlType.scala index b0a96985c95..dc79d295aa4 100644 --- a/src/main/scala/cromwell/binding/types/WdlType.scala +++ b/src/main/scala/cromwell/binding/types/WdlType.scala @@ -28,6 +28,13 @@ trait WdlType { } def toWdlString: String + + /** + * Converts rawString generated from WdlValue.toRawString back to a WdlValue. + * @param rawString Raw string generated by WdlValue.toRawString + * @return The WdlValue + */ + def fromRawString(rawString: String): WdlValue } object WdlType { diff --git a/src/main/scala/cromwell/binding/values/WdlObject.scala b/src/main/scala/cromwell/binding/values/WdlObject.scala index 8be1e6c00e2..68db67d21f0 100644 --- a/src/main/scala/cromwell/binding/values/WdlObject.scala +++ b/src/main/scala/cromwell/binding/values/WdlObject.scala @@ -4,4 +4,6 @@ import cromwell.binding.types.WdlObjectType case class WdlObject(value: Map[String, WdlValue]) extends WdlValue { val wdlType = WdlObjectType + + override def toRawString = ??? } diff --git a/src/main/scala/cromwell/binding/values/WdlPrimitive.scala b/src/main/scala/cromwell/binding/values/WdlPrimitive.scala index 4a3ce398251..ec9e56d28c4 100644 --- a/src/main/scala/cromwell/binding/values/WdlPrimitive.scala +++ b/src/main/scala/cromwell/binding/values/WdlPrimitive.scala @@ -1,8 +1,6 @@ package cromwell.binding.values -import scala.util.Try - trait WdlPrimitive extends WdlValue { def asString: String - override def toRawString = Try(asString) + override def toRawString = asString } diff --git a/src/main/scala/cromwell/binding/values/WdlValue.scala b/src/main/scala/cromwell/binding/values/WdlValue.scala index 2b467342255..5ad0653d06c 100644 --- a/src/main/scala/cromwell/binding/values/WdlValue.scala +++ b/src/main/scala/cromwell/binding/values/WdlValue.scala @@ -26,5 +26,6 @@ trait WdlValue { def not: Try[WdlValue] = invalid(s"!$this") def unaryPlus: Try[WdlValue] = invalid(s"+$this") def unaryMinus: Try[WdlValue] = invalid(s"-$this") - def toRawString: Try[String] = Try(toString) + def toRawString: String = toString + def typeName: String = wdlType.getClass.getSimpleName } diff --git a/src/main/scala/cromwell/engine/db/DataAccess.scala b/src/main/scala/cromwell/engine/db/DataAccess.scala index 93d91869258..e7cd7e726b5 100644 --- a/src/main/scala/cromwell/engine/db/DataAccess.scala +++ b/src/main/scala/cromwell/engine/db/DataAccess.scala @@ -5,11 +5,30 @@ import cromwell.binding.{Call, FullyQualifiedName, WdlJson, WdlSource} import cromwell.engine.backend.Backend import cromwell.engine.{SymbolStoreEntry, WorkflowId, WorkflowState} -import scala.concurrent.Future +import scala.concurrent.duration.Duration +import scala.concurrent.{Await, Future} object DataAccess { def apply(): DataAccess = new slick.SlickDataAccess() + /** + * Creates a DataAccess instance, loans it to the function, + * and then attempts to shut down the instance. + * + * @param f Function to run on the data access. + * @tparam T Return type of the function. + * @return Result of calling the function with the dataAccess instance. + */ + def withDataAccess[T](f: DataAccess => T): T = { + val dataAccess = DataAccess() + try { + f(dataAccess) + } finally { + // NOTE: shutdown result thrown away + Await.ready(dataAccess.shutdown(), Duration.Inf) + } + } + // TODO PLEASE RENAME ME case class WorkflowInfo(workflowId: WorkflowId, wdlSource: WdlSource, wdlJson: WdlJson) } @@ -54,4 +73,6 @@ trait DataAccess { def getExecutionStatuses(workflowId: WorkflowId): Future[Map[FullyQualifiedName, CallStatus]] + /** Shutdown. NOTE: Should (internally or explicitly) use AsyncExecutor.shutdownExecutor. */ + def shutdown(): Future[Unit] } diff --git a/src/main/scala/cromwell/engine/db/QueryWorkflowExecutionResult.scala b/src/main/scala/cromwell/engine/db/QueryWorkflowExecutionResult.scala deleted file mode 100644 index 04844453d9a..00000000000 --- a/src/main/scala/cromwell/engine/db/QueryWorkflowExecutionResult.scala +++ /dev/null @@ -1,11 +0,0 @@ -package cromwell.engine.db - -import java.util.Date - -import cromwell.binding.WdlSource -import cromwell.engine.{SymbolStoreEntry, WorkflowId, WorkflowState} - -case class QueryWorkflowExecutionResult(workflowId: WorkflowId, wdlUri: String, state: WorkflowState, - startTime: Date, endTime: Option[Date], - calls: Set[CallBackendInfo], symbols: Set[SymbolStoreEntry], - wdlSource: WdlSource, jsonInputs: String) diff --git a/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala b/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala index ae83fed8cc6..0390c3b5c48 100644 --- a/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala +++ b/src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala @@ -5,10 +5,9 @@ import java.util.{Date, UUID} import javax.sql.rowset.serial.SerialClob import _root_.slick.util.ConfigExtensionMethods._ -import com.typesafe.config.Config +import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import cromwell.binding._ import cromwell.binding.types.WdlType -import cromwell.binding.values.WdlValue import cromwell.engine._ import cromwell.engine.backend.Backend import cromwell.engine.backend.local.LocalBackend @@ -36,6 +35,32 @@ object SlickDataAccess { implicit class StringToClob(val str: String) extends AnyVal { def toClob: Clob = new SerialClob(str.toCharArray) } + + implicit class ConfigWithUniqueSchema(val config: Config) extends AnyVal { + /** + * Modifies config.getString("url") to return a unique schema, if the original url contains the text + * "${slick.uniqueSchema}". + * + * This allows each instance of a SlickDataAccess object to use a clean, and different, in memory database. + * + * @return Config with ${slick.uniqueSchema} in url replaced with a unique string. + */ + def withUniqueSchema: Config = { + val url = config.getString("url") + if (url.contains("${slick.uniqueSchema}")) { + // Config wasn't updating with a simple withValue/withFallback. + // So instead, do a bit of extra work to insert the generated schema name in the url. + val schema = UUID.randomUUID().toString + val newUrl = url.replaceAll("""\$\{slick\.uniqueSchema\}""", schema) + val origin = "url with slick.uniqueSchema=" + schema + val urlConfigValue = ConfigValueFactory.fromAnyRef(newUrl, origin) + val urlConfig = ConfigFactory.empty(origin).withValue("url", urlConfigValue) + urlConfig.withFallback(config) + } else { + config + } + } + } } class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponent) extends DataAccess { @@ -56,7 +81,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen import dataAccess.driver.api._ // NOTE: if you want to refactor database is inner-class type: this.dataAccess.driver.backend.DatabaseFactory - val database = Database.forConfig("", databaseConfig) + val database = Database.forConfig("", databaseConfig.withUniqueSchema) // Possibly create the database { @@ -70,11 +95,18 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen // used in key specification without a key length // // Perhaps we'll use a more optimized data type for UUID's bytes in the future, as a FK, instead auto-inc cols + // + // The value `${slick.uniqueSchema}` may be used in the url, in combination with `slick.createSchema = true`, to + // generate unique schema instances that don't conflict. + // + // Otherwise, create one DataAccess and hold on to the reference. if (databaseConfig.getBooleanOr("slick.createSchema")) { Await.result(database.run(dataAccess.schema.create), Duration.Inf) } } + override def shutdown() = database.shutdown + // Run action with an outer transaction private def runTransaction[R](action: DBIOAction[R, _ <: NoStream, _ <: Effect]): Future[R] = { database.run(action.transactionally) @@ -127,7 +159,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen symbol.key.iteration.getOrElse(IterationNone), if (symbol.isInput) IoInput else IoOutput, symbol.wdlType.toWdlString, - symbol.wdlValue.map(_.toRawString.get)) + symbol.wdlValue.map(_.toRawString)) } } @@ -341,8 +373,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen new SymbolStoreEntry( symbolStoreKey, wdlType, - symbolResult.wdlValue.map(wdlType.coerceRawValue(_).get)) - + symbolResult.wdlValue.map(wdlType.fromRawString)) } } yield symbolStoreEntries @@ -351,7 +382,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen } /** Should fail if a value is already set. The keys in the Map are locally qualified names. */ - override def setOutputs(workflowId: WorkflowId, call: Call, callOutputs: Map[String, WdlValue]): Future[Unit] = { + override def setOutputs(workflowId: WorkflowId, call: Call, callOutputs: WorkflowOutputs): Future[Unit] = { val action = for { workflowExecution <- dataAccess.workflowExecutionsByWorkflowExecutionUuid(workflowId.toString).result.head @@ -360,11 +391,11 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen new Symbol( workflowExecution.workflowExecutionId.get, call.fullyQualifiedName, - call.fullyQualifiedName + "." + symbolLocallyQualifiedName, + symbolLocallyQualifiedName, IterationNone, IoOutput, wdlValue.wdlType.toWdlString, - Option(wdlValue.toRawString.get)) + Option(wdlValue.toRawString)) } } yield () diff --git a/src/main/scala/cromwell/server/CromwellServer.scala b/src/main/scala/cromwell/server/CromwellServer.scala index 13665049372..bacc98258f5 100644 --- a/src/main/scala/cromwell/server/CromwellServer.scala +++ b/src/main/scala/cromwell/server/CromwellServer.scala @@ -7,7 +7,6 @@ import akka.pattern.ask import akka.util.Timeout import com.typesafe.config.ConfigFactory import com.wordnik.swagger.model.ApiInfo -import cromwell.engine.db.DataAccess import cromwell.webservice.{CromwellApiService, CromwellApiServiceActor, SwaggerService} import spray.can.Http @@ -18,7 +17,12 @@ import scala.util.{Failure, Success} // Note that as per the language specification, this is instantiated lazily and only used when necessary (i.e. server mode) object CromwellServer extends DefaultWorkflowManagerSystem { val conf = ConfigFactory.parseFile(new File("/etc/cromwell.conf")) - private lazy val realDataAccess = DataAccess() + + // NOTE: Currently the this.dataAccess is passed in to this.workflowManagerActor. + // The actor could technically restart with the same instance of the dataAccess, + // So, we're not shutting down dataAccess during this.workflowManagerActor.postStop() nor this.service.postStop(). + // Not sure otherwise when this server is really shutting down, so this.dataAccess currently never explicitly closed. + // Shouldn't be an issue unless perhaps test code tries to launch multiple servers and leaves dangling connections. val swaggerConfig = conf.getConfig("swagger") val swaggerService = new SwaggerService( @@ -51,7 +55,4 @@ object CromwellServer extends DefaultWorkflowManagerSystem { case _ => actorSystem.log.info("Cromwell service started...") } - - override def dataAccess: DataAccess = realDataAccess } - diff --git a/src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala b/src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala index 10dacb2a42d..ab189e38c88 100644 --- a/src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala +++ b/src/main/scala/cromwell/server/DefaultWorkflowManagerSystem.scala @@ -1,7 +1,4 @@ package cromwell.server -import cromwell.engine.db.DataAccess - case class DefaultWorkflowManagerSystem() extends WorkflowManagerSystem { - def dataAccess: DataAccess = DataAccess() } diff --git a/src/main/scala/cromwell/server/WorkflowManagerSystem.scala b/src/main/scala/cromwell/server/WorkflowManagerSystem.scala index 0cd805ea54e..0c8e59349f2 100644 --- a/src/main/scala/cromwell/server/WorkflowManagerSystem.scala +++ b/src/main/scala/cromwell/server/WorkflowManagerSystem.scala @@ -9,5 +9,13 @@ trait WorkflowManagerSystem { implicit val actorSystem = ActorSystem(systemName) lazy val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props(dataAccess)) - def dataAccess: DataAccess -} \ No newline at end of file + // Lazily created as the primary consumer is the workflowManagerActor. + private lazy val dataAccess: DataAccess = DataAccess() + + /** + * Should be called after the system is no longer in use. + * + * @return Non-blocking future that will eventually shut down this instance or return an error. + */ + def shutdown() = dataAccess.shutdown() +} diff --git a/src/test/scala/cromwell/MainSpec.scala b/src/test/scala/cromwell/MainSpec.scala index 2236373dd71..cbf50f2b647 100644 --- a/src/test/scala/cromwell/MainSpec.scala +++ b/src/test/scala/cromwell/MainSpec.scala @@ -3,17 +3,16 @@ package cromwell import akka.actor.ActorSystem import akka.testkit.EventFilter import com.typesafe.config.ConfigFactory -import cromwell.engine.db.DummyDataAccess -import cromwell.engine.workflow.WorkflowManagerActor import cromwell.server.WorkflowManagerSystem import cromwell.util.FileUtil import cromwell.util.SampleWdl.ThreeStep import org.scalatest.{FlatSpec, Matchers} +import scala.concurrent.Await +import scala.concurrent.duration.Duration + class TestWorkflowManagerSystem extends WorkflowManagerSystem { - override def dataAccess = DummyDataAccess() override implicit val actorSystem = ActorSystem(systemName, ConfigFactory.parseString(CromwellTestkitSpec.ConfigText)) - override lazy val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props(dataAccess)) } class MainSpec extends FlatSpec with Matchers { @@ -102,12 +101,12 @@ class MainSpec extends FlatSpec with Matchers { } it should "run" in { - val stream = new java.io.ByteArrayOutputStream() val workflowManagerSystem = new TestWorkflowManagerSystem implicit val system = workflowManagerSystem.actorSystem EventFilter.info(pattern = s"workflow finished", occurrences = 1).intercept { Main.run(Array(wdlFilePathAndWriter._1.toAbsolutePath.toString, inputsJsonPathAndWriter._1.toAbsolutePath.toString), workflowManagerSystem) } + Await.result(workflowManagerSystem.shutdown(), Duration.Inf) } it should "print usage" in { diff --git a/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala b/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala index beb77475aac..3646360a254 100644 --- a/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala +++ b/src/test/scala/cromwell/SimpleWorkflowActorSpec.scala @@ -8,7 +8,7 @@ import cromwell.binding._ import cromwell.binding.values.WdlString import cromwell.engine._ import cromwell.engine.backend.local.LocalBackend -import cromwell.engine.db.DummyDataAccess +import cromwell.engine.db.DataAccess import cromwell.engine.workflow.WorkflowActor import cromwell.engine.workflow.WorkflowActor._ import cromwell.util.SampleWdl @@ -21,6 +21,8 @@ import scala.language.postfixOps class SimpleWorkflowActorSpec extends CromwellTestkitSpec("SimpleWorkflowActorSpec") { + private val dataAccess = DataAccess() + private def buildWorkflowFSMRef(sampleWdl: SampleWdl, rawInputsOverride: Option[WorkflowRawInputs] = None): TestFSMRef[WorkflowState, WorkflowFailure, WorkflowActor] = { @@ -28,11 +30,14 @@ class SimpleWorkflowActorSpec extends CromwellTestkitSpec("SimpleWorkflowActorSp val rawInputs = rawInputsOverride.getOrElse(sampleWdl.rawInputs) val coercedInputs = namespace.coerceRawInputs(rawInputs).get val descriptor = WorkflowDescriptor(UUID.randomUUID(), namespace, sampleWdl.wdlSource(), sampleWdl.wdlJson, coercedInputs) - TestFSMRef(new WorkflowActor(descriptor, new LocalBackend, DummyDataAccess())) + TestFSMRef(new WorkflowActor(descriptor, new LocalBackend, dataAccess)) } override def afterAll() { + // TODO: Is this shutting down `system`? shutdown() + super.afterAll() + Await.result(dataAccess.shutdown(), Duration.Inf) } val TestExecutionTimeout = 5000 milliseconds diff --git a/src/test/scala/cromwell/ThreeStepActorSpec.scala b/src/test/scala/cromwell/ThreeStepActorSpec.scala index 29dd26ac68a..13e86478b8a 100644 --- a/src/test/scala/cromwell/ThreeStepActorSpec.scala +++ b/src/test/scala/cromwell/ThreeStepActorSpec.scala @@ -9,7 +9,7 @@ import cromwell.binding._ import cromwell.binding.values.WdlInteger import cromwell.engine._ import cromwell.engine.backend.local.LocalBackend -import cromwell.engine.db.DummyDataAccess +import cromwell.engine.db.DataAccess import cromwell.engine.workflow.WorkflowActor import cromwell.engine.workflow.WorkflowActor._ import cromwell.util.SampleWdl @@ -28,12 +28,18 @@ object ThreeStepActorSpec { class ThreeStepActorSpec extends CromwellTestkitSpec("ThreeStepActorSpec") { import ThreeStepActorSpec._ + val dataAccess = DataAccess() + + override protected def afterAll() = { + super.afterAll() + dataAccess.shutdown() + } + private def buildFsmWorkflowActor(sampleWdl: SampleWdl, runtime: String) = { val namespace = WdlNamespace.load(sampleWdl.wdlSource(runtime)) // This is a test and is okay with just throwing if coerceRawInputs returns a Failure. val coercedInputs = namespace.coerceRawInputs(sampleWdl.rawInputs).get val descriptor = WorkflowDescriptor(UUID.randomUUID(), namespace, sampleWdl.wdlSource(runtime), sampleWdl.wdlJson, coercedInputs) - val dataAccess = new DummyDataAccess() TestFSMRef(new WorkflowActor(descriptor, new LocalBackend, dataAccess)) } diff --git a/src/test/scala/cromwell/binding/values/WdlValueSpec.scala b/src/test/scala/cromwell/binding/values/WdlValueSpec.scala new file mode 100644 index 00000000000..de34337f014 --- /dev/null +++ b/src/test/scala/cromwell/binding/values/WdlValueSpec.scala @@ -0,0 +1,103 @@ +package cromwell.binding.values + +import java.nio.file.Paths + +import cromwell.binding.{WdlExpression, WdlNamespace} +import cromwell.util.SampleWdl +import org.scalatest.prop.TableDrivenPropertyChecks +import org.scalatest.{FlatSpec, Matchers} + +class WdlValueSpec extends FlatSpec with Matchers { + + import TableDrivenPropertyChecks._ + + behavior of "WdlValue" + + val wdlValueRawStrings = Table( + ("wdlValue", "rawString"), + (WdlBoolean.False, "false"), + (WdlBoolean.True, "true"), + (WdlFile(Paths.get("hello/world/path")), "hello/world/path"), + (WdlFile("hello/world/string"), "hello/world/string"), + (WdlFloat(0.0), "0.0"), + (WdlFloat(-0.0), "-0.0"), + (WdlFloat(Double.PositiveInfinity), "Infinity"), + (WdlFloat(Double.NegativeInfinity), "-Infinity"), + (WdlInteger(0), "0"), + (WdlInteger(Int.MaxValue), "2147483647"), + (WdlInteger(Int.MinValue), "-2147483648"), + (WdlString(""), ""), + (WdlString("test\\'/string"), "test\\'/string")) + + forAll(wdlValueRawStrings) { (wdlValue, rawString) => + it should s"exactly convert a ${wdlValue.typeName} to/from raw string '$rawString'" in { + val toRawString = wdlValue.toRawString + toRawString should be(rawString) + + val wdlType = wdlValue.wdlType + val fromRawString = wdlType.fromRawString(toRawString) + fromRawString should be(wdlValue) + fromRawString.wdlType should be(wdlType) + } + } + + val wdlFloatSpecials = Table( + ("wdlValue", "rawString", "validateFloat"), + (WdlFloat(Double.MinPositiveValue), "4.9E-324", { d: Double => d == 0.0 }), + (WdlFloat(Double.NaN), "NaN", { d: Double => d.isNaN }), + (WdlFloat(Double.MaxValue), "1.7976931348623157E308", { d: Double => d.isPosInfinity }), + (WdlFloat(Double.MinValue), "-1.7976931348623157E308", { d: Double => d.isNegInfinity })) + + forAll(wdlFloatSpecials) { (wdlValue, rawString, validateFloat) => + it should s"convert a special ${wdlValue.typeName} to/from raw string '$rawString'" in { + val toRawString = wdlValue.toRawString + toRawString should be(rawString) + + val wdlType = wdlValue.wdlType + val fromRawString = wdlType.fromRawString(toRawString) + // Test that this is a special conversion, and is not + // expected to be equal after a round-trip conversion. + fromRawString shouldNot be(wdlValue) + validateFloat(fromRawString.value) should be(right = true) + fromRawString.wdlType should be(wdlType) + } + } + + val wdlExpressionRawStrings = Table( + ("wdlValue", "rawString"), + (WdlExpression.fromString(" 1 != 0 "), "1 != 0"), + (WdlExpression.fromString("10 % 3.5"), "10 % 3.5"), + (WdlExpression.fromString("10 % 3"), "10 % 3"), + (WdlExpression.fromString("10-6.7"), "10 - 6.7"), + (WdlExpression.fromString(""" 1 + "String" """), """1 + "String""""), + (WdlExpression.fromString("a + b"), "a + b"), + (WdlExpression.fromString("a(b, c)"), "a(b, c)"), + (WdlExpression.fromString("\"a\" + \"b\""), "\"a\" + \"b\""), + (WdlExpression.fromString("a.b.c"), "a.b.c")) + + forAll(wdlExpressionRawStrings) { (wdlValue, rawString) => + it should s"resemble a ${wdlValue.typeName} to/from raw string '$rawString'" in { + val toRawString = wdlValue.toRawString + toRawString should be(rawString) + + val wdlType = wdlValue.wdlType + val fromRawString = wdlType.fromRawString(toRawString) + fromRawString shouldNot be(wdlValue) + fromRawString.toString should be(wdlValue.toString) + fromRawString.wdlType should be(wdlType) + } + } + + val notImplementRawString = Table( + "wdlValue", + WdlObject(Map("key" -> WdlString("value"))), + WdlNamespace.load(SampleWdl.HelloWorld.wdlSource())) + + forAll(notImplementRawString) { wdlValue => + it should s"not implement a ${wdlValue.typeName} raw string" in { + a [NotImplementedError] should be thrownBy wdlValue.toRawString + val wdlType = wdlValue.wdlType + a [NotImplementedError] should be thrownBy wdlType.fromRawString("") + } + } +} diff --git a/src/test/scala/cromwell/engine/WorkflowManagerActorSpec.scala b/src/test/scala/cromwell/engine/WorkflowManagerActorSpec.scala index 65dace88a71..774f25afd95 100644 --- a/src/test/scala/cromwell/engine/WorkflowManagerActorSpec.scala +++ b/src/test/scala/cromwell/engine/WorkflowManagerActorSpec.scala @@ -1,22 +1,24 @@ package cromwell.engine -import java.util.{Calendar, UUID} +import java.util.UUID import akka.testkit.{EventFilter, TestActorRef} +import cromwell.binding._ +import cromwell.binding.command.Command import cromwell.binding.types.WdlStringType import cromwell.binding.values.WdlString import cromwell.engine.ExecutionStatus.{NotStarted, Running} +import cromwell.engine.backend.local.LocalBackend +import cromwell.engine.db.DataAccess import cromwell.engine.db.DataAccess.WorkflowInfo -import cromwell.engine.db.{DummyDataAccess, QueryWorkflowExecutionResult} import cromwell.engine.workflow.WorkflowManagerActor import cromwell.engine.workflow.WorkflowManagerActor.{SubmitWorkflow, WorkflowOutputs, WorkflowStatus} import cromwell.util.SampleWdl import cromwell.util.SampleWdl.{HelloWorld, Incr} import cromwell.{CromwellTestkitSpec, binding} -import scala.collection.concurrent.TrieMap -import scala.concurrent.Future -import scala.concurrent.duration._ +import scala.concurrent.duration.{Duration, _} +import scala.concurrent.{Await, ExecutionContext, Future} import scala.language.postfixOps import scala.util.{Failure, Success, Try} @@ -26,64 +28,67 @@ class WorkflowManagerActorSpec extends CromwellTestkitSpec("WorkflowManagerActor "A WorkflowManagerActor" should { "run the Hello World workflow" in { - implicit val workflowManagerActor = TestActorRef(WorkflowManagerActor.props(DummyDataAccess()), self, "Test the WorkflowManagerActor") + DataAccess.withDataAccess { dataAccess => + implicit val workflowManagerActor = TestActorRef(WorkflowManagerActor.props(dataAccess), self, "Test the WorkflowManagerActor") - val workflowId = waitForHandledMessagePattern(pattern = "Transition\\(.*,Running,Succeeded\\)$") { - messageAndWait[WorkflowId](SubmitWorkflow(HelloWorld.wdlSource(), HelloWorld.wdlJson, HelloWorld.rawInputs)) - } + val workflowId = waitForHandledMessagePattern(pattern = "Transition\\(.*,Running,Succeeded\\)$") { + messageAndWait[WorkflowId](SubmitWorkflow(HelloWorld.wdlSource(), HelloWorld.wdlJson, HelloWorld.rawInputs)) + } - val status = messageAndWait[Option[WorkflowState]](WorkflowStatus(workflowId)).get - status shouldEqual WorkflowSucceeded + val status = messageAndWait[Option[WorkflowState]](WorkflowStatus(workflowId)).get + status shouldEqual WorkflowSucceeded - val outputs = messageAndWait[binding.WorkflowOutputs](WorkflowOutputs(workflowId)) + val outputs = messageAndWait[binding.WorkflowOutputs](WorkflowOutputs(workflowId)) - val actual = outputs.map { case (k, WdlString(string)) => k -> string } - actual shouldEqual Map(HelloWorld.OutputKey -> HelloWorld.OutputValue) + val actual = outputs.map { case (k, WdlString(string)) => k -> string } + actual shouldEqual Map(HelloWorld.OutputKey -> HelloWorld.OutputValue) + } } "Not try to restart any workflows when there are no workflows in restartable states" in { - waitForPattern("Found no workflows to restart.") { - TestActorRef(WorkflowManagerActor.props(DummyDataAccess()), self, "No workflows") + DataAccess.withDataAccess { dataAccess => + waitForPattern("Found no workflows to restart.") { + TestActorRef(WorkflowManagerActor.props(dataAccess), self, "No workflows") + } } } "Try to restart workflows when there are workflows in restartable states" in { - val (submitted, running) = (result(WorkflowSubmitted), result(WorkflowRunning)) - val workflows = Seq(submitted, running) - val ids = workflows.map { - _.workflowId.toString - }.sorted + val workflows = Map( + UUID.randomUUID() -> WorkflowSubmitted, + UUID.randomUUID() -> WorkflowRunning) + val ids = workflows.keys.map(_.toString).toSeq.sorted val key = SymbolStoreKey("hello.hello", "addressee", None, input = true) val symbols = Map(key -> new SymbolStoreEntry(key, WdlStringType, Option(WdlString("world")))) - val dataAccess = new DummyDataAccess() { - workflows foreach { workflow => - val id = workflow.workflowId - val status = if (id == submitted.workflowId) NotStarted else Running - executionStatuses(id) = TrieMap("hello.hello" -> status) - symbolStore(id) = TrieMap.empty - symbols foreach { case(symbolStoreKey, symbolStoreEntry) => - symbolStore(id)(symbolStoreKey) = symbolStoreEntry - } - } - - override def getWorkflowsByState(states: Traversable[WorkflowState]): Future[Traversable[WorkflowInfo]] = { - Future.successful { - workflows.map { w => - WorkflowInfo(w.workflowId, w.wdlSource, w.jsonInputs) - } + DataAccess.withDataAccess { dataAccess => + import ExecutionContext.Implicits.global + val setupFuture = Future.sequence( + workflows map { case (workflowId, workflowState) => + val wdlSource = SampleWdl.HelloWorld.wdlSource() + val wdlInputs = SampleWdl.HelloWorld.wdlJson + val status = if (workflowState == WorkflowSubmitted) NotStarted else Running + val workflowInfo = new WorkflowInfo(workflowId, wdlSource, wdlInputs) + val task = new Task("taskName", new Command(Seq.empty), Seq.empty, Map.empty) + val call = new Call(None, key.scope, task, Map.empty, null) + for { + _ <- dataAccess.createWorkflow(workflowInfo, symbols.values, Seq(call), new LocalBackend()) + _ <- dataAccess.updateWorkflowState(workflowId, workflowState) + _ <- dataAccess.setStatus(workflowId, Seq(call.fullyQualifiedName), status) + } yield () } - } - } - - waitForPattern("Restarting workflow IDs: " + ids.mkString(", ")) { - waitForPattern("Found 2 workflows to restart.") { - // Workflows are always set back to Submitted on restart. - waitForPattern("transitioning from Submitted to Running.", occurrences = 2) { - // Both the previously in-flight call and the never-started call should get started. - waitForPattern("starting calls: hello.hello", occurrences = 2) { - waitForPattern("transitioning from Running to Succeeded", occurrences = 2) { - TestActorRef(WorkflowManagerActor.props(dataAccess), self, "2 restartable workflows") + ) + Await.result(setupFuture, Duration.Inf) + + waitForPattern("Restarting workflow IDs: " + ids.mkString(", ")) { + waitForPattern("Found 2 workflows to restart.") { + // Workflows are always set back to Submitted on restart. + waitForPattern("transitioning from Submitted to Running.", occurrences = 2) { + // Both the previously in-flight call and the never-started call should get started. + waitForPattern("starting calls: hello.hello", occurrences = 2) { + waitForPattern("transitioning from Running to Succeeded", occurrences = 2) { + TestActorRef(WorkflowManagerActor.props(dataAccess), self, "2 restartable workflows") + } } } } @@ -94,25 +99,20 @@ class WorkflowManagerActorSpec extends CromwellTestkitSpec("WorkflowManagerActor val TestExecutionTimeout = 5000 milliseconds "Handle coercion failures gracefully" in { - within(TestExecutionTimeout) { - implicit val workflowManagerActor = TestActorRef(WorkflowManagerActor.props(DummyDataAccess()), self, "Test WorkflowManagerActor coercion failures") - EventFilter.error(pattern = "Workflow failed submission").intercept { - Try { - messageAndWait[WorkflowId](SubmitWorkflow(Incr.wdlSource(), Incr.wdlJson, Incr.rawInputs)) - } match { - case Success(_) => fail("Expected submission to fail with uncoercable inputs") - case Failure(e) => - e.getMessage shouldBe "Failed to coerce input incr.incr.val value 1 of class java.lang.String to WdlIntegerType." + DataAccess.withDataAccess { dataAccess => + within(TestExecutionTimeout) { + implicit val workflowManagerActor = TestActorRef(WorkflowManagerActor.props(dataAccess), self, "Test WorkflowManagerActor coercion failures") + EventFilter.error(pattern = "Workflow failed submission").intercept { + Try { + messageAndWait[WorkflowId](SubmitWorkflow(Incr.wdlSource(), Incr.wdlJson, Incr.rawInputs)) + } match { + case Success(_) => fail("Expected submission to fail with uncoercable inputs") + case Failure(e) => + e.getMessage shouldBe "Failed to coerce input incr.incr.val value 1 of class java.lang.String to WdlIntegerType." + } } } } } } - - def result(workflowState: WorkflowState, - wdlSource: String = SampleWdl.HelloWorld.wdlSource(), - wdlInputs: String = SampleWdl.HelloWorld.wdlJson): QueryWorkflowExecutionResult = { - QueryWorkflowExecutionResult( - UUID.randomUUID(), "http://wdl.me", workflowState, Calendar.getInstance().getTime, None, Set.empty, Set.empty, wdlSource, wdlInputs) - } } diff --git a/src/test/scala/cromwell/engine/db/DummyDataAccess.scala b/src/test/scala/cromwell/engine/db/DummyDataAccess.scala deleted file mode 100644 index fa49e9a21e9..00000000000 --- a/src/test/scala/cromwell/engine/db/DummyDataAccess.scala +++ /dev/null @@ -1,99 +0,0 @@ -package cromwell.engine.db - -import cromwell.binding.values.WdlValue -import cromwell.binding.{Call, FullyQualifiedName} -import cromwell.engine.ExecutionStatus.ExecutionStatus -import cromwell.engine._ -import cromwell.engine.backend.Backend -import cromwell.engine.db.DataAccess.WorkflowInfo - -import scala.collection.concurrent.TrieMap -import scala.concurrent.Future - -case class DummyDataAccess() extends DataAccess { - - private val workflowStates: TrieMap[WorkflowId, WorkflowState] = TrieMap.empty - - val executionStatuses: TrieMap[WorkflowId, TrieMap[String, ExecutionStatus]] = TrieMap.empty - - val symbolStore: TrieMap[WorkflowId, TrieMap[SymbolStoreKey, SymbolStoreEntry]] = TrieMap.empty - - /** - * Creates a row in each of the backend-info specific tables for each call in `calls` corresponding to the backend - * `backend`. Or perhaps defer this? - * FIXME does not do what the comment above says, fix behavior or comment. - */ - override def createWorkflow(workflowInfo: WorkflowInfo, symbols: Traversable[SymbolStoreEntry], - calls: Traversable[Call], backend: Backend): Future[Unit] = { - Future.successful { - val id = workflowInfo.workflowId - executionStatuses += (id -> TrieMap.empty) - symbolStore += (id -> TrieMap.empty) - setStatus(id, calls map { _.fullyQualifiedName }, ExecutionStatus.NotStarted) - symbols foreach { symbol => - symbolStore(id)(symbol.key) = symbol - } - } - } - - override def getWorkflowsByState(states: Traversable[WorkflowState]): Future[Traversable[WorkflowInfo]] = { - val statesSet = states.toSet - Future.successful(workflowStates.collect { case (id, state) if statesSet.contains(state) => WorkflowInfo(id, "", "")}) - } - - override def setStatus(workflowId: WorkflowId, callFqns: Traversable[FullyQualifiedName], callStatus: CallStatus): Future[Unit] = { - Future.successful { - callFqns foreach { callFqn => - executionStatuses(workflowId)(callFqn) = callStatus - } - } - } - - private def getSymbols(workflowId: WorkflowId, inputs: Boolean, scope: Option[String]): Traversable[SymbolStoreEntry] = { - def passesFilter(key: SymbolStoreKey): Boolean = key.input == inputs && (scope.isEmpty || scope.get == key.scope) - symbolStore(workflowId).collect { case (key, value) if passesFilter(key) => value } - } - - /** Returns all outputs for this workflowId */ - override def getOutputs(workflowId: WorkflowId): Future[Traversable[SymbolStoreEntry]] = { - Future.successful { getSymbols(workflowId, inputs = false, scope = None) } - } - - /** Get all outputs for the scope of this call. */ - override def getOutputs(workflowId: WorkflowId, call: Call): Future[Traversable[SymbolStoreEntry]] = { - Future.successful { getSymbols(workflowId, inputs = false, scope = Some(call.fullyQualifiedName)) } - } - - /** Get all inputs for the scope of this call. TODO refactor with above. */ - override def getInputs(workflowId: WorkflowId, call: Call): Future[Traversable[SymbolStoreEntry]] = { - println("looking up inputs for " + call.name) - Future.successful { getSymbols(workflowId, inputs = true, scope = Some(call.fullyQualifiedName)) } - } - - /** The keys in the Map are locally qualified names. */ - override def setOutputs(workflowId: WorkflowId, call: Call, callOutputs: Map[String, WdlValue]): Future[Unit] = { - Future.successful { - callOutputs foreach { case (name, wdlValue) => - val entry = SymbolStoreEntry(call.fullyQualifiedName + "." + name, wdlValue, input = false) - symbolStore(workflowId)(entry.key) = entry - } - } - } - - override def getExecutionStatuses(workflowId: WorkflowId): Future[Map[FullyQualifiedName, CallStatus]] = { - Future.successful(executionStatuses(workflowId).toMap) - } - - override def getExecutionBackendInfo(workflowId: WorkflowId, call: Call): Future[CallBackendInfo] = ??? - - override def updateWorkflowState(workflowId: WorkflowId, workflowState: WorkflowState): Future[Unit] = { - Future.successful(workflowStates += (workflowId -> workflowState)) - } - - override def getWorkflowState(workflowId: WorkflowId): Future[Option[WorkflowState]] = { - Future.successful(workflowStates.get(workflowId)) - } - - override def updateExecutionBackendInfo(workflowId: WorkflowId, call: Call, backendInfo: CallBackendInfo): Future[Unit] = ??? - -} diff --git a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala index 80b2d7084ee..b1dc88e78ee 100644 --- a/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala +++ b/src/test/scala/cromwell/engine/db/slick/SlickDataAccessSpec.scala @@ -57,10 +57,8 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { def databaseWithConfig(path: => String, testRequired: => Boolean): Unit = { lazy val testDatabase = new TestSlickDatabase(path) + lazy val canConnect = testRequired || testDatabase.isValidConnection.futureValue lazy val dataAccess = testDatabase.slickDataAccess - lazy val canConnect = { - testRequired || (DatabaseConfig.rootDatabaseConfig.hasPath(path) && testDatabase.isValidConnection.futureValue) - } it should "setup via liquibase if necessary" in { assume(canConnect || testRequired) @@ -283,7 +281,7 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { val resultSymbol = results.head val resultSymbolStoreKey = resultSymbol.key resultSymbolStoreKey.scope should be("call.fully.qualified.scope") - resultSymbolStoreKey.name should be("call.fully.qualified.scope.symbol") + resultSymbolStoreKey.name should be("symbol") resultSymbolStoreKey.iteration should be(None) resultSymbolStoreKey.input should be(right = false) // IntelliJ highlighting resultSymbol.wdlType should be(WdlStringType) @@ -311,7 +309,7 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { val resultSymbol = results.head val resultSymbolStoreKey = resultSymbol.key resultSymbolStoreKey.scope should be("call.fully.qualified.scope") - resultSymbolStoreKey.name should be("call.fully.qualified.scope.symbol") + resultSymbolStoreKey.name should be("symbol") resultSymbolStoreKey.iteration should be(None) resultSymbolStoreKey.input should be(right = false) // IntelliJ highlighting resultSymbol.wdlType should be(WdlStringType) @@ -378,7 +376,7 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { val resultSymbol = results.head val resultSymbolStoreKey = resultSymbol.key resultSymbolStoreKey.scope should be("call.fully.qualified.scope") - resultSymbolStoreKey.name should be("call.fully.qualified.scope.symbol") + resultSymbolStoreKey.name should be("symbol") resultSymbolStoreKey.iteration should be(None) resultSymbolStoreKey.input should be(right = false) // IntelliJ highlighting resultSymbol.wdlType should be(WdlStringType) @@ -403,7 +401,7 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { (for { _ <- dataAccess.createWorkflow(workflowInfo, Seq(entry), Seq.empty, localBackend) _ <- dataAccess.updateWorkflowState(workflowId, WorkflowRunning) - _ <- dataAccess.setOutputs(workflowId, call, Map(symbolLqn -> new WdlString("testStringValue"))) + _ <- dataAccess.setOutputs(workflowId, call, Map(symbolFqn -> new WdlString("testStringValue"))) } yield ()).failed.futureValue should be(a[SQLException]) } @@ -484,6 +482,11 @@ class SlickDataAccessSpec extends FlatSpec with Matchers with ScalaFutures { _ <- dataAccess.updateExecutionBackendInfo(workflowId, call, null) } yield ()).failed.futureValue should be(an[IllegalArgumentException]) } + + it should "shutdown the database" in { + assume(canConnect || testRequired) + dataAccess.shutdown().futureValue + } } } diff --git a/src/test/scala/cromwell/engine/db/slick/TestSlickDatabase.scala b/src/test/scala/cromwell/engine/db/slick/TestSlickDatabase.scala index a7b5314ea2b..78c6ffb7b13 100644 --- a/src/test/scala/cromwell/engine/db/slick/TestSlickDatabase.scala +++ b/src/test/scala/cromwell/engine/db/slick/TestSlickDatabase.scala @@ -15,14 +15,18 @@ class TestSlickDatabase(configPath: String) { private lazy val databaseConfig = DatabaseConfig.rootDatabaseConfig.getConfig(configPath) private lazy val log = LoggerFactory.getLogger(classOf[TestSlickDatabase]) - val dataAccessComponent: DataAccessComponent = new DataAccessComponent(databaseConfig.getString("slick.driver")) + // NOTE: Using the import below for isValidConnection, but maybe not the lazy instance if the check fails. + lazy val dataAccessComponent: DataAccessComponent = new DataAccessComponent(databaseConfig.getString("slick.driver")) import dataAccessComponent.driver.api._ /** * Check the database connection. - * Can be run before operations that use slickDataAccess, - * but creates a whole new connection pool to do so. + * + * This check only produces warnings, not errors. The primary use case is checking if _optional_ tests can be run + * against a database configuration. + * + * Can be run before operations that use slickDataAccess, but creates a whole new connection pool to do so. */ def isValidConnection: Future[Boolean] = { implicit val executionContext = ExecutionContext.global @@ -30,15 +34,15 @@ class TestSlickDatabase(configPath: String) { log.debug("Opening test connection setup for " + configPath) Database.forConfig("", databaseConfig) } flatMap { database => - database.run(SimpleDBIO(_.connection.isValid(1))) recover { - case ex => - log.error("Unable to connect to database under config: " + configPath, ex) - false - } andThen { + database.run(SimpleDBIO(_.connection.isValid(1))) andThen { case _ => log.debug("Closing test connection setup for " + configPath) database.close() } + } recover { + case ex => + log.warn("Unable to connect to database under config: " + configPath, ex) + false } } diff --git a/src/test/scala/cromwell/engine/workflow/SingleWorkflowRunnerActorSpec.scala b/src/test/scala/cromwell/engine/workflow/SingleWorkflowRunnerActorSpec.scala index 91099351eeb..8f920bd00cb 100644 --- a/src/test/scala/cromwell/engine/workflow/SingleWorkflowRunnerActorSpec.scala +++ b/src/test/scala/cromwell/engine/workflow/SingleWorkflowRunnerActorSpec.scala @@ -1,24 +1,23 @@ package cromwell.engine.workflow import cromwell.CromwellTestkitSpec -import cromwell.binding.FullyQualifiedName -import cromwell.engine.ExecutionStatus.NotStarted -import cromwell.engine.WorkflowId -import cromwell.engine.db.{CallStatus, DummyDataAccess} +import cromwell.engine.db.DataAccess import cromwell.util.SampleWdl.ThreeStep -import scala.concurrent.Future +import scala.concurrent.Await +import scala.concurrent.duration.Duration import scala.language.postfixOps class SingleWorkflowRunnerActorSpec extends CromwellTestkitSpec("SingleWorkflowRunnerActorSpec") { - val dataAccess = new DummyDataAccess() { - override def getExecutionStatuses(workflowId: WorkflowId): Future[Map[FullyQualifiedName, CallStatus]] = { - Future.successful(Seq("ps", "cgrep", "wc").map { "three_step." + _ -> NotStarted}.toMap) - } - } + val dataAccess = DataAccess() val workflowManagerActor = system.actorOf(WorkflowManagerActor.props(dataAccess)) val props = SingleWorkflowRunnerActor.props(ThreeStep.wdlSource(), ThreeStep.wdlJson, ThreeStep.rawInputs, workflowManagerActor) + override def afterAll() { + super.afterAll() + Await.result(dataAccess.shutdown(), Duration.Inf) + } + "A SingleWorkflowRunnerActor" should { "successfully run a workflow" in { waitForPattern("workflow finished with status 'Succeeded'.") { diff --git a/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala b/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala index fda4f8d301a..8e215069f5d 100644 --- a/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala +++ b/src/test/scala/cromwell/webservice/CromwellApiServiceIntegrationSpec.scala @@ -1,18 +1,27 @@ package cromwell.webservice -import cromwell.engine.db.DummyDataAccess +import cromwell.engine.db.DataAccess import cromwell.engine.workflow.WorkflowManagerActor import cromwell.util.SampleWdl.HelloWorld -import org.scalatest.{FlatSpec, Matchers} +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfter, FlatSpec, Matchers} import spray.http.{FormData, StatusCodes} import spray.json.DefaultJsonProtocol._ import spray.json._ import spray.testkit.ScalatestRouteTest +import scala.concurrent.Await +import scala.concurrent.duration.Duration + class CromwellApiServiceIntegrationSpec extends FlatSpec with CromwellApiService with ScalatestRouteTest with Matchers { def actorRefFactory = system - val workflowManager = system.actorOf(WorkflowManagerActor.props(DummyDataAccess())) + val dataAccess = DataAccess() + val workflowManager = system.actorOf(WorkflowManagerActor.props(dataAccess)) + + override protected def afterAll() { + super.afterAll() + Await.result(dataAccess.shutdown(), Duration.Inf) + } it should "return 400 for a malformed WDL " in { Post("/workflows", FormData(Seq("wdlSource" -> CromwellApiServiceSpec.MalformedWdl , "workflowInputs" -> HelloWorld.rawInputs.toJson.toString() ))) ~>