Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updates to fix coercion errors. DSDEEPB-727 #84

Merged
merged 1 commit into from
Jul 8, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ database {

main {
hsqldb {
url = "jdbc:hsqldb:mem:tempdb;shutdown=false"
url = "jdbc:hsqldb:mem:${slick.uniqueSchema};shutdown=false"
driver = "org.hsqldb.jdbcDriver"
slick.driver = "slick.driver.HsqldbDriver"
slick.createSchema = true
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/cromwell/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ object Main extends App {
case e: Exception =>
println("Unable to process inputs")
e.printStackTrace()
} finally {
workflowManagerSystem.shutdown() // Future[Unit], but give it a shot
}
}

Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/cromwell/binding/WdlNamespace.scala
Original file line number Diff line number Diff line change
Expand Up @@ -421,5 +421,7 @@ case class WdlNamespace(ast: Ast, source: WdlSource, importResolver: ImportResol
}
}

override def toRawString = ???

validate()
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,6 @@ case object WdlBooleanType extends WdlType {
case s: JsString if s.value.equalsIgnoreCase("false") => WdlBoolean.False
case s: JsBoolean => WdlBoolean(s.value)
}
}

override def fromRawString(rawString: String) = WdlBoolean(rawString.toBoolean)
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,6 @@ case object WdlExpressionType extends WdlType {
override protected def coercion = {
case s: String if s.startsWith("%expr:") => WdlExpression.fromString(s.replace("%expr:", ""))
}
}

override def fromRawString(rawString: String) = WdlExpression.fromString(rawString)
}
2 changes: 2 additions & 0 deletions src/main/scala/cromwell/binding/types/WdlFileType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ case object WdlFileType extends WdlType {
case s: String => WdlFile(s)
case s: JsString => WdlFile(s.value)
}

override def fromRawString(rawString: String) = WdlFile(rawString)
}
3 changes: 2 additions & 1 deletion src/main/scala/cromwell/binding/types/WdlFloatType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ case object WdlFloatType extends WdlType {
case d: Double => WdlFloat(d)
case n: JsNumber => WdlFloat(n.value.doubleValue())
}
}

override def fromRawString(rawString: String) = WdlFloat(rawString.toFloat)
}
6 changes: 4 additions & 2 deletions src/main/scala/cromwell/binding/types/WdlIntegerType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ case object WdlIntegerType extends WdlType {

override protected def coercion = {
case i: Integer => WdlInteger(i)
case n: JsNumber => WdlInteger(n.value.intValue)
case n: JsNumber => WdlInteger(n.value.intValue())
}
}

override def fromRawString(rawString: String) = WdlInteger(rawString.toInt)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ package cromwell.binding.types
case object WdlNamespaceType extends WdlType {
override def toWdlString: String = "Namespace"
override protected def coercion = ???
override def fromRawString(rawString: String) = ???
}
2 changes: 2 additions & 0 deletions src/main/scala/cromwell/binding/types/WdlObjectType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ case object WdlObjectType extends WdlType {
val toWdlString: String = "Object"

override protected def coercion = ???

override def fromRawString(rawString: String) = ???
}
2 changes: 2 additions & 0 deletions src/main/scala/cromwell/binding/types/WdlStringType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ case object WdlStringType extends WdlType {
case s: String => WdlString(s)
case s: JsString => WdlString(s.value)
}

override def fromRawString(rawString: String) = WdlString(rawString)
}
7 changes: 7 additions & 0 deletions src/main/scala/cromwell/binding/types/WdlType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,13 @@ trait WdlType {
}

def toWdlString: String

/**
* Converts rawString generated from WdlValue.toRawString back to a WdlValue.
* @param rawString Raw string generated by WdlValue.toRawString
* @return The WdlValue
*/
def fromRawString(rawString: String): WdlValue
}

object WdlType {
Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/cromwell/binding/values/WdlObject.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,6 @@ import cromwell.binding.types.WdlObjectType

case class WdlObject(value: Map[String, WdlValue]) extends WdlValue {
val wdlType = WdlObjectType

override def toRawString = ???
}
4 changes: 1 addition & 3 deletions src/main/scala/cromwell/binding/values/WdlPrimitive.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cromwell.binding.values

import scala.util.Try

trait WdlPrimitive extends WdlValue {
def asString: String
override def toRawString = Try(asString)
override def toRawString = asString
}
3 changes: 2 additions & 1 deletion src/main/scala/cromwell/binding/values/WdlValue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,6 @@ trait WdlValue {
def not: Try[WdlValue] = invalid(s"!$this")
def unaryPlus: Try[WdlValue] = invalid(s"+$this")
def unaryMinus: Try[WdlValue] = invalid(s"-$this")
def toRawString: Try[String] = Try(toString)
def toRawString: String = toString
def typeName: String = wdlType.getClass.getSimpleName
}
23 changes: 22 additions & 1 deletion src/main/scala/cromwell/engine/db/DataAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,30 @@ import cromwell.binding.{Call, FullyQualifiedName, WdlJson, WdlSource}
import cromwell.engine.backend.Backend
import cromwell.engine.{SymbolStoreEntry, WorkflowId, WorkflowState}

import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}

object DataAccess {
def apply(): DataAccess = new slick.SlickDataAccess()

/**
* Creates a DataAccess instance, loans it to the function,
* and then attempts to shut down the instance.
*
* @param f Function to run on the data access.
* @tparam T Return type of the function.
* @return Result of calling the function with the dataAccess instance.
*/
def withDataAccess[T](f: DataAccess => T): T = {
val dataAccess = DataAccess()
try {
f(dataAccess)
} finally {
// NOTE: shutdown result thrown away
Await.ready(dataAccess.shutdown(), Duration.Inf)
}
}

// TODO PLEASE RENAME ME
case class WorkflowInfo(workflowId: WorkflowId, wdlSource: WdlSource, wdlJson: WdlJson)
}
Expand Down Expand Up @@ -54,4 +73,6 @@ trait DataAccess {

def getExecutionStatuses(workflowId: WorkflowId): Future[Map[FullyQualifiedName, CallStatus]]

/** Shutdown. NOTE: Should (internally or explicitly) use AsyncExecutor.shutdownExecutor. */
def shutdown(): Future[Unit]
}

This file was deleted.

49 changes: 40 additions & 9 deletions src/main/scala/cromwell/engine/db/slick/SlickDataAccess.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import java.util.{Date, UUID}
import javax.sql.rowset.serial.SerialClob

import _root_.slick.util.ConfigExtensionMethods._
import com.typesafe.config.Config
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import cromwell.binding._
import cromwell.binding.types.WdlType
import cromwell.binding.values.WdlValue
import cromwell.engine._
import cromwell.engine.backend.Backend
import cromwell.engine.backend.local.LocalBackend
Expand Down Expand Up @@ -36,6 +35,32 @@ object SlickDataAccess {
implicit class StringToClob(val str: String) extends AnyVal {
def toClob: Clob = new SerialClob(str.toCharArray)
}

implicit class ConfigWithUniqueSchema(val config: Config) extends AnyVal {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm behind the times, can you explain what this is doing (and why)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function below (optionally) inserts a unique string into the config value url, that is seconds later used internally by slick to connect to the database.

When this code snippet activates for the default in memory database (see also reference.conf) this allows two things:

First, quick google searches weren't clear on a one-liner way to make slick behave like liquibase, only updating a schema. If special checks weren't added, each time the schema creation ran, after the first time, it would bomb out on "table already exists". There were a couple of search hits on how to peek in the database to see if the tables existed, but I ultimately backed off on this because...

Secondly, providing a clean, unique database each call to DataAccess()/new SlickDataAccess() leaves the behavior SlickDataAccess similar to the old DummyDataAccess. The coercion bug wasn't discovered earlier as DummyDataAccess was being used for tests. Now they can just use DataAccess() and a clean database will be created for the test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha. Can you leave some commentary here describing the situation a bit more?

/**
* Modifies config.getString("url") to return a unique schema, if the original url contains the text
* "${slick.uniqueSchema}".
*
* This allows each instance of a SlickDataAccess object to use a clean, and different, in memory database.
*
* @return Config with ${slick.uniqueSchema} in url replaced with a unique string.
*/
def withUniqueSchema: Config = {
val url = config.getString("url")
if (url.contains("${slick.uniqueSchema}")) {
// Config wasn't updating with a simple withValue/withFallback.
// So instead, do a bit of extra work to insert the generated schema name in the url.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wish I knew Typesafe Config well enough to suggest a workaround... 😢

val schema = UUID.randomUUID().toString
val newUrl = url.replaceAll("""\$\{slick\.uniqueSchema\}""", schema)
val origin = "url with slick.uniqueSchema=" + schema
val urlConfigValue = ConfigValueFactory.fromAnyRef(newUrl, origin)
val urlConfig = ConfigFactory.empty(origin).withValue("url", urlConfigValue)
urlConfig.withFallback(config)
} else {
config
}
}
}
}

class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponent) extends DataAccess {
Expand All @@ -56,7 +81,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen
import dataAccess.driver.api._

// NOTE: if you want to refactor database is inner-class type: this.dataAccess.driver.backend.DatabaseFactory
val database = Database.forConfig("", databaseConfig)
val database = Database.forConfig("", databaseConfig.withUniqueSchema)

// Possibly create the database
{
Expand All @@ -70,11 +95,18 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen
// used in key specification without a key length
//
// Perhaps we'll use a more optimized data type for UUID's bytes in the future, as a FK, instead auto-inc cols
//
// The value `${slick.uniqueSchema}` may be used in the url, in combination with `slick.createSchema = true`, to
// generate unique schema instances that don't conflict.
//
// Otherwise, create one DataAccess and hold on to the reference.
if (databaseConfig.getBooleanOr("slick.createSchema")) {
Await.result(database.run(dataAccess.schema.create), Duration.Inf)
}
}

override def shutdown() = database.shutdown

// Run action with an outer transaction
private def runTransaction[R](action: DBIOAction[R, _ <: NoStream, _ <: Effect]): Future[R] = {
database.run(action.transactionally)
Expand Down Expand Up @@ -127,7 +159,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen
symbol.key.iteration.getOrElse(IterationNone),
if (symbol.isInput) IoInput else IoOutput,
symbol.wdlType.toWdlString,
symbol.wdlValue.map(_.toRawString.get))
symbol.wdlValue.map(_.toRawString))
}
}

Expand Down Expand Up @@ -341,8 +373,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen
new SymbolStoreEntry(
symbolStoreKey,
wdlType,
symbolResult.wdlValue.map(wdlType.coerceRawValue(_).get))

symbolResult.wdlValue.map(wdlType.fromRawString))
}

} yield symbolStoreEntries
Expand All @@ -351,7 +382,7 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen
}

/** Should fail if a value is already set. The keys in the Map are locally qualified names. */
override def setOutputs(workflowId: WorkflowId, call: Call, callOutputs: Map[String, WdlValue]): Future[Unit] = {
override def setOutputs(workflowId: WorkflowId, call: Call, callOutputs: WorkflowOutputs): Future[Unit] = {

val action = for {
workflowExecution <- dataAccess.workflowExecutionsByWorkflowExecutionUuid(workflowId.toString).result.head
Expand All @@ -360,11 +391,11 @@ class SlickDataAccess(databaseConfig: Config, val dataAccess: DataAccessComponen
new Symbol(
workflowExecution.workflowExecutionId.get,
call.fullyQualifiedName,
call.fullyQualifiedName + "." + symbolLocallyQualifiedName,
symbolLocallyQualifiedName,
IterationNone,
IoOutput,
wdlValue.wdlType.toWdlString,
Option(wdlValue.toRawString.get))
Option(wdlValue.toRawString))
}
} yield ()

Expand Down
11 changes: 6 additions & 5 deletions src/main/scala/cromwell/server/CromwellServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import akka.pattern.ask
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import com.wordnik.swagger.model.ApiInfo
import cromwell.engine.db.DataAccess
import cromwell.webservice.{CromwellApiService, CromwellApiServiceActor, SwaggerService}
import spray.can.Http

Expand All @@ -18,7 +17,12 @@ import scala.util.{Failure, Success}
// Note that as per the language specification, this is instantiated lazily and only used when necessary (i.e. server mode)
object CromwellServer extends DefaultWorkflowManagerSystem {
val conf = ConfigFactory.parseFile(new File("/etc/cromwell.conf"))
private lazy val realDataAccess = DataAccess()

// NOTE: Currently the this.dataAccess is passed in to this.workflowManagerActor.
// The actor could technically restart with the same instance of the dataAccess,
// So, we're not shutting down dataAccess during this.workflowManagerActor.postStop() nor this.service.postStop().
// Not sure otherwise when this server is really shutting down, so this.dataAccess currently never explicitly closed.
// Shouldn't be an issue unless perhaps test code tries to launch multiple servers and leaves dangling connections.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, we have some lifecycle stuff to figure out. Thanks for documenting what's happening here. 😄


val swaggerConfig = conf.getConfig("swagger")
val swaggerService = new SwaggerService(
Expand Down Expand Up @@ -51,7 +55,4 @@ object CromwellServer extends DefaultWorkflowManagerSystem {
case _ =>
actorSystem.log.info("Cromwell service started...")
}

override def dataAccess: DataAccess = realDataAccess
}

Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
package cromwell.server

import cromwell.engine.db.DataAccess

case class DefaultWorkflowManagerSystem() extends WorkflowManagerSystem {
def dataAccess: DataAccess = DataAccess()
}
12 changes: 10 additions & 2 deletions src/main/scala/cromwell/server/WorkflowManagerSystem.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,13 @@ trait WorkflowManagerSystem {
implicit val actorSystem = ActorSystem(systemName)
lazy val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props(dataAccess))

def dataAccess: DataAccess
}
// Lazily created as the primary consumer is the workflowManagerActor.
private lazy val dataAccess: DataAccess = DataAccess()

/**
* Should be called after the system is no longer in use.
*
* @return Non-blocking future that will eventually shut down this instance or return an error.
*/
def shutdown() = dataAccess.shutdown()
}
9 changes: 4 additions & 5 deletions src/test/scala/cromwell/MainSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,16 @@ package cromwell
import akka.actor.ActorSystem
import akka.testkit.EventFilter
import com.typesafe.config.ConfigFactory
import cromwell.engine.db.DummyDataAccess
import cromwell.engine.workflow.WorkflowManagerActor
import cromwell.server.WorkflowManagerSystem
import cromwell.util.FileUtil
import cromwell.util.SampleWdl.ThreeStep
import org.scalatest.{FlatSpec, Matchers}

import scala.concurrent.Await
import scala.concurrent.duration.Duration

class TestWorkflowManagerSystem extends WorkflowManagerSystem {
override def dataAccess = DummyDataAccess()
override implicit val actorSystem = ActorSystem(systemName, ConfigFactory.parseString(CromwellTestkitSpec.ConfigText))
override lazy val workflowManagerActor = actorSystem.actorOf(WorkflowManagerActor.props(dataAccess))
}

class MainSpec extends FlatSpec with Matchers {
Expand Down Expand Up @@ -102,12 +101,12 @@ class MainSpec extends FlatSpec with Matchers {
}

it should "run" in {
val stream = new java.io.ByteArrayOutputStream()
val workflowManagerSystem = new TestWorkflowManagerSystem
implicit val system = workflowManagerSystem.actorSystem
EventFilter.info(pattern = s"workflow finished", occurrences = 1).intercept {
Main.run(Array(wdlFilePathAndWriter._1.toAbsolutePath.toString, inputsJsonPathAndWriter._1.toAbsolutePath.toString), workflowManagerSystem)
}
Await.result(workflowManagerSystem.shutdown(), Duration.Inf)
}

it should "print usage" in {
Expand Down
Loading