diff --git a/project/Build.scala b/project/Build.scala index da5ee670a..8c6a57d3c 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -220,6 +220,22 @@ object SummingbirdBuild extends Build { summingbirdBatch ) + lazy val summingbirdStormTest = module("storm-test").settings( + parallelExecution in Test := false, + libraryDependencies ++= Seq( + "com.twitter" %% "algebird-core" % algebirdVersion, + "com.twitter" %% "bijection-core" % bijectionVersion, + "com.twitter" %% "storehaus-core" % storehausVersion, + "com.twitter" %% "storehaus-algebra" % storehausVersion, + "com.twitter" %% "tormenta-core" % tormentaVersion, + withCross("com.twitter" %% "util-core" % utilVersion), + "storm" % "storm" % "0.9.0-wip15" % "provided" + ) + ).dependsOn( + summingbirdCore % "test->test;compile->compile", + summingbirdStorm + ) + lazy val summingbirdScalding = module("scalding").settings( libraryDependencies ++= Seq( "com.backtype" % "dfs-datastores" % dfsDatastoresVersion, diff --git a/summingbird-storm-test/src/main/scala/com/twitter/summingbird/storm/StormTestRun.scala b/summingbird-storm-test/src/main/scala/com/twitter/summingbird/storm/StormTestRun.scala new file mode 100644 index 000000000..6daac622e --- /dev/null +++ b/summingbird-storm-test/src/main/scala/com/twitter/summingbird/storm/StormTestRun.scala @@ -0,0 +1,105 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.storm + +import com.twitter.algebird.Semigroup +import backtype.storm.{ Config => BacktypeStormConfig, LocalCluster, Testing } +import backtype.storm.testing.{ CompleteTopologyParam, MockedSources } +import com.twitter.summingbird.storm.spout.TraversableSpout +import com.twitter.summingbird._ +import com.twitter.summingbird.planner._ +import com.twitter.tormenta.spout.Spout +import scala.collection.JavaConverters._ +import java.security.Permission + + +/** + * This stops Storm's exception handling triggering an exit(1) + */ +private[storm] class MySecurityManager extends SecurityManager { + override def checkExit(status: Int): Unit = { + throw new SecurityException(); + } + override def checkAccess(t: Thread) = {} + override def checkPermission(p: Permission) = {} +} + +/* + * This is a wrapper to run a storm topology. + * We use the SecurityManager code to catch the System.exit storm calls when it + * fails. We wrap it into a normal exception instead so it can report better/retry. + */ + +object StormTestRun { + private def completeTopologyParam(conf: BacktypeStormConfig) = { + val ret = new CompleteTopologyParam() + ret.setMockedSources(new MockedSources) + ret.setStormConf(conf) + ret.setCleanupState(false) + ret + } + + + private def tryRun(plannedTopology: PlannedTopology): Unit = { + //Before running the external Command + val oldSecManager = System.getSecurityManager() + System.setSecurityManager(new MySecurityManager()); + try { + val cluster = new LocalCluster() + Testing.completeTopology(cluster, plannedTopology.topology, completeTopologyParam(plannedTopology.config)) + // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 + Thread.sleep(1000) + cluster.shutdown + } finally { + System.setSecurityManager(oldSecManager) + } + } + + def apply(graph: TailProducer[Storm, Any])(implicit storm: Storm) { + val topo = storm.plan(graph) + apply(topo) + } + + def simpleRun[T, K, V: Semigroup](original: List[T], mkJob: (Producer[Storm, T], Storm#Store[K, V]) => TailProducer[Storm, Any]) + : TestStore[K, V] = { + + implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L) + + val (id, store) = TestStore.createStore[K, V]() + + val job = mkJob( + Storm.source(TraversableSpout(original)), + store + ) + + implicit val s = Storm.local(Map()) + apply(job) + TestStore[K, V](id).getOrElse(sys.error("Error running test, unable to find store at the end")) + } + + def apply(plannedTopology: PlannedTopology) { + this.synchronized { + try { + tryRun(plannedTopology) + } catch { + case _: Throwable => + Thread.sleep(3000) + tryRun(plannedTopology) + } + } + } +} diff --git a/summingbird-storm-test/src/main/scala/com/twitter/summingbird/storm/TestStore.scala b/summingbird-storm-test/src/main/scala/com/twitter/summingbird/storm/TestStore.scala new file mode 100644 index 000000000..aa6ddb566 --- /dev/null +++ b/summingbird-storm-test/src/main/scala/com/twitter/summingbird/storm/TestStore.scala @@ -0,0 +1,109 @@ +/* + Copyright 2013 Twitter, Inc. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package com.twitter.summingbird.storm + +import com.twitter.algebird.Semigroup +import com.twitter.storehaus.algebra.MergeableStore +import com.twitter.summingbird.batch.{BatchID, Batcher} +import com.twitter.util.Future +import java.util.{Collections, HashMap, Map => JMap, UUID} +import java.util.concurrent.atomic.AtomicInteger +import scala.collection.mutable.SynchronizedMap +import java.util.WeakHashMap +import scala.collection.JavaConverters._ + + + +object TestStore { + private val testStores = new WeakHashMap[String, TestStore[_, _]] + + def apply[K, V: Semigroup](storeID: String): Option[TestStore[K, V]] = + (Option(testStores.get(storeID)).map{s => + s.asInstanceOf[TestStore[K, V]]}) + + private def buildStore[K, V: Semigroup](initialData: Map[K, V]) : String = { + val storeID = UUID.randomUUID.toString + val newInitStore = TestStore[K, V](storeID, initialData) + testStores.synchronized { + testStores.put(storeID, newInitStore) + } + storeID + } + + def createBatchedStore[K, V] + (initialData: Map[(K, BatchID), V] = Map.empty[(K, BatchID), V]) + (implicit batcher: Batcher, valueSG: Semigroup[V]): + (String, MergeableStoreSupplier[K, V]) = { + + val storeID = buildStore[(K, BatchID), V](initialData) + val supplier = MergeableStoreSupplier.from( + TestStore.apply[(K, BatchID), V](storeID) + .getOrElse(sys.error("Weak hash map no longer contains store")) + ) + (storeID, supplier) + } + + def createStore[K, V: Semigroup](initialData: Map[K, V] = Map.empty[K, V]): + (String, MergeableStoreSupplier[K, V]) = { + val storeID = buildStore[K, V](initialData) + val supplier = MergeableStoreSupplier.fromOnlineOnly( + TestStore.apply[K, V](storeID) + .getOrElse(sys.error("Weak hash map no longer contains store")) + ) + + (storeID, supplier) + } +} + + +case class TestStore[K, V: Semigroup](storeID: String, initialData: Map[K, V]) extends MergeableStore[K, V] { + private val backingStore: JMap[K, Option[V]] = + Collections.synchronizedMap(new HashMap[K, Option[V]]()) + val updates: AtomicInteger = new AtomicInteger(0) + val reads: AtomicInteger = new AtomicInteger(0) + + def toScala: Map[K, V] = backingStore.asScala.collect{ case (k, Some(v)) => (k, v)}.toMap + + private def getOpt(k: K) = { + reads.incrementAndGet + Option(backingStore.get(k)).flatMap(i => i) + } + + val semigroup = implicitly[Semigroup[V]] + + override def get(k: K) = Future.value(getOpt(k)) + + override def put(pair: (K, Option[V])) = { + val (k, optV) = pair + if (optV.isDefined) + backingStore.put(k, optV) + else + backingStore.remove(k) + updates.incrementAndGet + Future.Unit + } + + override def merge(pair: (K, V)) = { + val (k, v) = pair + val oldV = getOpt(k) + val newV = Semigroup.plus(Some(v), oldV) + updates.incrementAndGet + backingStore.put(k, newV) + Future.value(oldV) + } + +} diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala similarity index 100% rename from summingbird-storm/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala rename to summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/InjectionLaws.scala diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala similarity index 63% rename from summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala rename to summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala index dcccbc809..0fcc43b60 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormLaws.scala @@ -49,70 +49,6 @@ import java.security.Permission * Tests for Summingbird's Storm planner. */ -/** - * State required to perform a single Storm test run. - */ -case class TestState[T, K, V]( - store: JMap[(K, BatchID), Option[V]] = Collections.synchronizedMap(new HashMap[(K, BatchID), Option[V]]()), - used: ArrayBuffer[T] = new ArrayBuffer[T] with SynchronizedBuffer[T], - placed: AtomicInteger = new AtomicInteger -) - -object TrueGlobalState { - val data = new MutableHashMap[String, TestState[Int, Int, Int]] - with SynchronizedMap[String, TestState[Int, Int, Int]] -} - - class MySecurityManager extends SecurityManager { - override def checkExit(status: Int): Unit = { - throw new SecurityException(); - } - override def checkAccess(t: Thread) = {} - override def checkPermission(p: Permission) = {} - } - -/* - * This is a wrapper to run a storm topology. - * We use the SecurityManager code to catch the System.exit storm calls when it - * fails. We wrap it into a normal exception instead so it can report better/retry. - */ - -object StormRunner { - private def completeTopologyParam(conf: BacktypeStormConfig) = { - val ret = new CompleteTopologyParam() - ret.setMockedSources(new MockedSources) - ret.setStormConf(conf) - ret.setCleanupState(false) - ret - } - - private def tryRun(plannedTopology: PlannedTopology): Unit = { - //Before running the external Command - val oldSecManager = System.getSecurityManager() - System.setSecurityManager(new MySecurityManager()); - try { - val cluster = new LocalCluster() - Testing.completeTopology(cluster, plannedTopology.topology, completeTopologyParam(plannedTopology.config)) - // Sleep to prevent this race: https://github.com/nathanmarz/storm/pull/667 - Thread.sleep(1000) - cluster.shutdown - } finally { - System.setSecurityManager(oldSecManager) - } - } - - def run(plannedTopology: PlannedTopology) { - this.synchronized { - try { - tryRun(plannedTopology) - } catch { - case _: Throwable => - Thread.sleep(3000) - tryRun(plannedTopology) - } - } - } -} object StormLaws extends Specification { sequential @@ -124,77 +60,19 @@ object StormLaws extends Specification { implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L) implicit val batcher = Batcher.unit - /** - * Global state shared by all tests. - */ - val globalState = TrueGlobalState.data - - /** - * Returns a MergeableStore that routes get, put and merge calls - * through to the backing store in the proper globalState entry. - */ - def testingStore(id: String) = - new MergeableStore[(Int, BatchID), Int] with java.io.Serializable { - val semigroup = implicitly[Semigroup[Int]] - def wrappedStore = globalState(id).store - private def getOpt(k: (Int, BatchID)) = Option(wrappedStore.get(k)).flatMap(i => i) - override def get(k: (Int, BatchID)) = Future.value(getOpt(k)) - override def put(pair: ((Int, BatchID), Option[Int])) = { - val (k, optV) = pair - if (optV.isDefined) - wrappedStore.put(k, optV) - else - wrappedStore.remove(k) - globalState(id).placed.incrementAndGet - Future.Unit - } - override def merge(pair: ((Int, BatchID), Int)) = { - val (k, v) = pair - val oldV = getOpt(k) - val newV = Semigroup.plus(Some(v), oldV) - wrappedStore.put(k, newV) - globalState(id).placed.incrementAndGet - Future.value(oldV) - } - } - val testFn = sample[Int => List[(Int, Int)]] - val storm = Storm.local(Map( - )) + implicit val storm = Storm.local(Map()) def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get - def genStore: (String, Storm#Store[Int, Int]) = { - val id = UUID.randomUUID.toString - globalState += (id -> TestState()) - val store = MergeableStoreSupplier(() => testingStore(id), Batcher.unit) - (id, store) - } + def genStore: (String, Storm#Store[Int, Int]) = TestStore.createStore[Int, Int]() def genSink:() => ((Int) => Future[Unit]) = () => {x: Int => append(x) Future.Unit } - /** - * Perform a single run of TestGraphs.singleStepJob using the - * supplied list of integers and the testFn defined above. - */ - def runOnce(original: List[Int])(mkJob: (Producer[Storm, Int], Storm#Store[Int, Int]) => TailProducer[Storm, Any]) - : TestState[Int, Int, Int] = { - - val (id, store) = genStore - - val job = mkJob( - Storm.source(TraversableSpout(original)), - store - ) - val topo = storm.plan(job) - StormRunner.run(topo) - globalState(id) - } - def memoryPlanWithoutSummer(original: List[Int])(mkJob: (Producer[Memory, Int], Memory#Sink[Int]) => TailProducer[Memory, Int]) : List[Int] = { val memory = new Memory @@ -225,13 +103,10 @@ object StormLaws extends Specification { Storm.sink[Int]({ (x: Int) => append(x); Future.Unit }) ) - val topo = storm.plan(job) - StormRunner.run(topo) + StormTestRun(job) StormLaws.outputList.toList } - - val nextFn = { pair: ((Int, (Int, Option[Int]))) => val (k, (v, joinedV)) = pair List((k -> joinedV.getOrElse(10))) @@ -245,15 +120,13 @@ object StormLaws extends Specification { "StormPlatform matches Scala for single step jobs" in { val original = sample[List[Int]] val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(testFn) ) - Equiv[Map[Int, Int]].equiv( TestGraphs.singleStepInScala(original)(testFn), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } @@ -261,15 +134,13 @@ object StormLaws extends Specification { val original = sample[List[Int]] val fn = {(x: Int) => List[(Int, Int)]()} val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(fn) ) - Equiv[Map[Int, Int]].equiv( TestGraphs.singleStepInScala(original)(fn), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } @@ -279,15 +150,14 @@ object StormLaws extends Specification { val fnB = sample[Int => List[(Int,Int)]] val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.twinStepOptionMapFlatMapJob[Storm, Int, Int, Int, Int](_,_)(fnA, fnB) ) Equiv[Map[Int, Int]].equiv( TestGraphs.twinStepOptionMapFlatMapScala(original)(fnA, fnB), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } @@ -297,13 +167,12 @@ object StormLaws extends Specification { val fnB = sample[Int => List[(Int,Int)]] val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.twinStepOptionMapFlatMapJob[Storm, Int, Int, Int, Int](_,_)(fnA, fnB) ) Equiv[Map[Int, Int]].equiv( TestGraphs.twinStepOptionMapFlatMapScala(original)(fnA, fnB), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } @@ -314,14 +183,13 @@ object StormLaws extends Specification { expander(x).flatMap{case (k, v) => List((k, v), (k, v), (k, v), (k, v), (k, v))} } val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.singleStepJob[Storm, Int, Int, Int](_,_)(expansionFunc) ) Equiv[Map[Int, Int]].equiv( TestGraphs.singleStepInScala(original)(expansionFunc), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } @@ -330,14 +198,13 @@ object StormLaws extends Specification { val fnA = sample[Int => List[(Int, Int)]] val fnB = sample[Int => List[Int]] val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.singleStepMapKeysJob[Storm, Int, Int, Int, Int](_,_)(fnA, fnB) ) Equiv[Map[Int, Int]].equiv( TestGraphs.singleStepMapKeysInScala(original)(fnA, fnB), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } @@ -345,21 +212,20 @@ object StormLaws extends Specification { val original = sample[List[Int]] val staticFunc = { i: Int => List((i -> i)) } val returnedState = - runOnce(original)( + StormTestRun.simpleRun[Int, Int, Int](original, TestGraphs.leftJoinJob[Storm, Int, Int, Int, Int, Int](_, service, _)(staticFunc)(nextFn) ) Equiv[Map[Int, Int]].equiv( TestGraphs.leftJoinInScala(original)(serviceFn) (staticFunc)(nextFn), - returnedState.store.asScala.toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + returnedState.toScala ) must beTrue } "StormPlatform matches Scala for optionMap only jobs" in { val original = sample[List[Int]] - val (id, store) = genStore + val (id, storeSupplier) = genStore val cluster = new LocalCluster() @@ -367,16 +233,13 @@ object StormLaws extends Specification { Storm.source(TraversableSpout(original)) .filter(_ % 2 == 0) .map(_ -> 10) - .sumByKey(store) + .sumByKey(storeSupplier) - val topo = storm.plan(producer) - StormRunner.run(topo) + StormTestRun(producer) Equiv[Map[Int, Int]].equiv( MapAlgebra.sumByKey(original.filter(_ % 2 == 0).map(_ -> 10)), - globalState(id).store.asScala - .toMap - .collect { case ((k, batchID), Some(v)) => (k, v) } + TestStore[Int, Int](id).get.toScala ) must beTrue } @@ -407,23 +270,20 @@ object StormLaws extends Specification { val tail = TestGraphs.multipleSummerJob[Storm, Int, Int, Int, Int, Int, Int](source, store1, store2)(simpleOp, doubler, doubler) - val topo = storm.plan(tail) - StormRunner.run(topo) + StormTestRun(tail) val (scalaA, scalaB) = TestGraphs.multipleSummerJobInScala(original)(simpleOp, doubler, doubler) - val store1Map = globalState(store1Id).store.asScala.toMap - val store2Map = globalState(store2Id).store.asScala.toMap + val store1Map = TestStore[Int, Int](store1Id).get.toScala + val store2Map = TestStore[Int, Int](store2Id).get.toScala Equiv[Map[Int, Int]].equiv( scalaA, store1Map - .collect { case ((k, batchID), Some(v)) => (k, v) } ) must beTrue Equiv[Map[Int, Int]].equiv( scalaB, store2Map - .collect { case ((k, batchID), Some(v)) => (k, v) } ) must beTrue } @@ -452,20 +312,17 @@ object StormLaws extends Specification { val tail = TestGraphs.realJoinTestJob[Storm, Int, Int, Int, Int, Int, Int, Int, Int, Int](source1, source2, source3, source4, service, store1, fn1, fn2, fn3, preJoinFn, postJoinFn) - val topo = storm.plan(tail) OnlinePlan(tail).nodes.size must beLessThan(10) - - StormRunner.run(topo) + StormTestRun(tail) val scalaA = TestGraphs.realJoinTestJobInScala(original1, original2, original3, original4, serviceFn, fn1, fn2, fn3, preJoinFn, postJoinFn) - val store1Map = globalState(store1Id).store.asScala.toMap + val store1Map = TestStore[Int, Int](store1Id).get.toScala Equiv[Map[Int, Int]].equiv( scalaA, store1Map - .collect { case ((k, batchID), Some(v)) => (k, v) } ) must beTrue } diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala similarity index 100% rename from summingbird-storm/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala rename to summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/StormPlanTopology.scala diff --git a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala similarity index 73% rename from summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala rename to summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala index 409134d04..fe730b03d 100644 --- a/summingbird-storm/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala +++ b/summingbird-storm-test/src/test/scala/com/twitter/summingbird/storm/TopologyTests.scala @@ -56,44 +56,6 @@ object TopologyTests extends Specification { implicit def extractor[T]: TimeExtractor[T] = TimeExtractor(_ => 0L) implicit val batcher = Batcher.unit - def createGlobalState[T, K, V] = - new MutableHashMap[String, TestState[T, K, V]] - with SynchronizedMap[String, TestState[T, K, V]] - - /** - * Global state shared by all tests. - */ - val globalState = createGlobalState[Int, Int, Int] - - /** - * Returns a MergeableStore that routes get, put and merge calls - * through to the backing store in the proper globalState entry. - */ - def testingStore(id: String) = - new MergeableStore[(Int, BatchID), Int] with java.io.Serializable { - val semigroup = implicitly[Semigroup[Int]] - def wrappedStore = globalState(id).store - private def getOpt(k: (Int, BatchID)) = Option(wrappedStore.get(k)).flatMap(i => i) - override def get(k: (Int, BatchID)) = Future.value(getOpt(k)) - override def put(pair: ((Int, BatchID), Option[Int])) = { - val (k, optV) = pair - if (optV.isDefined) - wrappedStore.put(k, optV) - else - wrappedStore.remove(k) - globalState(id).placed.incrementAndGet - Future.Unit - } - override def merge(pair: ((Int, BatchID), Int)) = { - val (k, v) = pair - val oldV = getOpt(k) - val newV = Semigroup.plus(Some(v), oldV) - wrappedStore.put(k, newV) - globalState(id).placed.incrementAndGet - Future.value(oldV) - } - } - /** * The function tested below. We can't generate a function with * ScalaCheck, as we need to know the number of tuples that the @@ -101,7 +63,7 @@ object TopologyTests extends Specification { */ val testFn = { i: Int => List((i -> i)) } - val storm = Storm.local() + implicit val storm = Storm.local() def sample[T: Arbitrary]: T = Arbitrary.arbitrary[T].sample.get @@ -112,12 +74,10 @@ object TopologyTests extends Specification { def funcToPlan(mkJob: (Producer[Storm, Int], Storm#Store[Int, Int]) => TailProducer[Storm, Any]) : StormTopology = { val original = sample[List[Int]] - val id = UUID.randomUUID.toString - globalState += (id -> TestState()) val job = mkJob( Storm.source(TraversableSpout(original)), - MergeableStoreSupplier(() => testingStore(id), Batcher.unit) + TestStore.createStore[Int, Int]()._2 ) storm.plan(job).topology @@ -145,7 +105,7 @@ object TopologyTests extends Specification { val nodeName = "super dooper node" val p = Storm.source(TraversableSpout(sample[List[Int]])) .flatMap(testFn).name(nodeName) - .sumByKey(MergeableStoreSupplier(() => testingStore(UUID.randomUUID.toString), Batcher.unit)) + .sumByKey(TestStore.createStore[Int, Int]()._2) val opts = Map(nodeName -> Options().set(FlatMapParallelism(50))) val storm = Storm.local(opts) @@ -156,7 +116,7 @@ object TopologyTests extends Specification { // Tail will have 1 -, distance from there should be onwards val TDistMap = bolts.map{case (k, v) => (k.split("-").size - 1, v)} - TDistMap(1).get_common.get_parallelism_hint must_== 50 + TDistMap(1).get_common.get_parallelism_hint must_== 50 } "With 2 names in a row we take the closest name" in { @@ -164,7 +124,7 @@ object TopologyTests extends Specification { val otherNodeName = "super dooper node" val p = Storm.source(TraversableSpout(sample[List[Int]])) .flatMap(testFn).name(nodeName).name(otherNodeName) - .sumByKey(MergeableStoreSupplier(() => testingStore(UUID.randomUUID.toString), Batcher.unit)) + .sumByKey(TestStore.createStore[Int, Int]()._2) val opts = Map(otherNodeName -> Options().set(FlatMapParallelism(40)), nodeName -> Options().set(FlatMapParallelism(50))) @@ -177,7 +137,7 @@ object TopologyTests extends Specification { // Tail will have 1 -, distance from there should be onwards val TDistMap = bolts.map{case (k, v) => (k.split("-").size - 1, v)} - TDistMap(1).get_common.get_parallelism_hint must_== 50 + TDistMap(1).get_common.get_parallelism_hint must_== 50 } "If the closes doesnt contain the option we keep going" in { @@ -185,7 +145,7 @@ object TopologyTests extends Specification { val otherNodeName = "super dooper node" val p = Storm.source(TraversableSpout(sample[List[Int]])) .flatMap(testFn).name(otherNodeName).name(nodeName) - .sumByKey(MergeableStoreSupplier(() => testingStore(UUID.randomUUID.toString), Batcher.unit)) + .sumByKey(TestStore.createStore[Int, Int]()._2) val opts = Map(otherNodeName -> Options().set(SpoutParallelism(30)), nodeName -> Options().set(FlatMapParallelism(50))) @@ -197,14 +157,14 @@ object TopologyTests extends Specification { // Tail will have 1 -, distance from there should be onwards val TDistMap = bolts.map{case (k, v) => (k.split("-").size - 1, v)} - TDistMap(1).get_common.get_parallelism_hint must_== 50 + TDistMap(1).get_common.get_parallelism_hint must_== 50 } "Options propagate backwards" in { val nodeName = "super dooper node" val p = Storm.source(TraversableSpout(sample[List[Int]])) .flatMap(testFn).name(nodeName).name("Throw away name") - .sumByKey(MergeableStoreSupplier(() => testingStore(UUID.randomUUID.toString), Batcher.unit)) + .sumByKey(TestStore.createStore[Int, Int]()._2) val opts = Map(nodeName -> Options().set(FlatMapParallelism(50)).set(SpoutParallelism(30))) val storm = Storm.local(opts) @@ -214,7 +174,7 @@ object TopologyTests extends Specification { val spouts = stormTopo.get_spouts val spout = spouts.head._2 - spout.get_common.get_parallelism_hint must_== 30 + spout.get_common.get_parallelism_hint must_== 30 } "Options don't propagate forwards" in { @@ -222,7 +182,7 @@ object TopologyTests extends Specification { val otherNodeName = "super dooper node" val p = Storm.source(TraversableSpout(sample[List[Int]])) .flatMap(testFn).name(otherNodeName).name(nodeName) - .sumByKey(MergeableStoreSupplier(() => testingStore(UUID.randomUUID.toString), Batcher.unit)) + .sumByKey(TestStore.createStore[Int, Int]()._2) val opts = Map(otherNodeName -> Options().set(SpoutParallelism(30)).set(SummerParallelism(50)), nodeName -> Options().set(FlatMapParallelism(50)))