Skip to content
This repository was archived by the owner on Feb 8, 2019. It is now read-only.

[MRQL-90] Add support for Storm streaming #30

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions bin/mrql.storm
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
#!/bin/bash
#--------------------------------------------------------------------------------
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
#--------------------------------------------------------------------------------
#
# run Apache MRQL in Storm mode using Apache Storm
#
#--------------------------------------------------------------------------------

MRQL_HOME="$(cd `dirname $0`/..; pwd -P)"

. "$MRQL_HOME/conf/mrql-env.sh"

GEN_JAR=`ls "$MRQL_HOME"/lib/mrql-gen-*.jar`
CORE_JAR=`ls "$MRQL_HOME"/lib/mrql-core-*.jar`
MRQL_JAR=`ls "$MRQL_HOME"/lib/mrql-storm-*.jar`
FULL_JAR="/tmp/${USER}_mrql_storm.jar"
CLASS_DIR="/tmp/${USER}_mrql_classes"

if [[ -z ${STORM_JARS} ]]; then
echo "*** Cannot find the Storm jar file. Need to edit mrql-env.sh"; exit -1
fi
HADOOP_JARS=${HADOOP_HOME}/share/hadoop/mapreduce/hadoop-mapreduce-client-core-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/hadoop-common-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/hdfs/hadoop-hdfs-${HADOOP_VERSION}.jar:${HADOOP_HOME}/share/hadoop/common/lib/*
LAMDA_JARS="/tmp/mrql_jar_${USER}/*"

export JAVA_HOME FS_DEFAULT_NAME BSP_MASTER_ADDRESS STORM_ZOOKEEPER_QUORUM BSP_SPLIT_INPUT

if [[ ($MRQL_JAR -nt $FULL_JAR) ]]; then
rm -rf $CLASS_DIR
mkdir -p $CLASS_DIR
pushd $CLASS_DIR > /dev/null
$JAVA_HOME/bin/jar xf $CUP_JAR
$JAVA_HOME/bin/jar xf $JLINE_JAR
$JAVA_HOME/bin/jar xf $GEN_JAR
$JAVA_HOME/bin/jar xf $CORE_JAR
$JAVA_HOME/bin/jar xf $MRQL_JAR
#$JAVA_HOME/bin/jar xf $HADOOP_JARS
#$JAVA_HOME/bin/jar xf $LAMDA_JARS
cd ..
$JAVA_HOME/bin/jar cf $FULL_JAR -C $CLASS_DIR .
popd > /dev/null
fi

if [ "$1" == "-local" ]; then
export STORM_CLASSPATH=$FULL_JAR # FOR LOCAL CLUSTER IN STORM
export storm.jar=$FULL_JAR
$STORM_HOME/bin/storm jar --config $STORM_CONFIG jar $FULL_JAR org.apache.mrql.Main -storm $*
else if [ "$1" == "-dist" ]; then
#export STORM_CLASSPATH=$FULL_JAR # FOR LOCAL CLUSTER IN STORM
#export storm.jar=$FULL_JAR
#echo $storm.jar
$STORM_HOME/bin/storm jar --config $STORM_CONFIG jar $FULL_JAR org.apache.mrql.Main -storm $*
else
#storm.jar=$FULL_JAR
#echo $storm.jar
#$STORM_HOME/bin/storm jar $FULL_JAR org.apache.mrql.Main -storm $*
#$JAVA_HOME/bin/java -classpath "$FULL_JAR:$STORM_JARS:$HADOOP_JARS" -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=1044 org.apache.mrql.Main -storm $*
$JAVA_HOME/bin/java -classpath "$FULL_JAR:$STORM_JARS:$HADOOP_JARS:$LAMDA_JARS" org.apache.mrql.Main -storm $*
fi
fi
23 changes: 19 additions & 4 deletions conf/mrql-env.sh
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ MAPRED_JOB_TRACKER=
# The HDFS namenode URI (eg, hdfs://localhost:9000/). If empty, it is the one defined in core-site.xml
FS_DEFAULT_NAME=


# Optional: Hama configuration. Supports versions 0.6.2, 0.6.3, 0.6.4, and 0.7.0, and 0.7.1
HAMA_VERSION=0.7.1
# The Hama installation directory
Expand All @@ -83,7 +82,7 @@ BSP_SPLIT_INPUT=
# Spark versions 0.8.1, 0.9.0, and 0.9.1 are supported by MRQL 0.9.0 only.
# You may use the Spark prebuilts bin-hadoop1 or bin-hadoop2 (Yarn)
# Tested in local, standalone deploy, and Yarn modes
SPARK_HOME=${HOME}/spark-1.6.2-bin-hadoop2.6
SPARK_HOME=${HOME}/spark
# URI of the Spark master node:
# to run Spark on Standalone Mode, set it to spark://`hostname`:7077
# to run Spark on a YARN cluster, set it to "yarn-client"
Expand All @@ -100,15 +99,23 @@ SPARK_EXECUTOR_MEMORY=1G


# Optional: Flink configuration. Supports version 1.0.2 and 1.0.3
FLINK_VERSION=1.0.2
FLINK_VERSION=1.1.2
# Flink installation directory
FLINK_HOME=${HOME}/flink-${FLINK_VERSION}
FLINK_HOME=${HOME}/ap/flink-${FLINK_VERSION}
# number of slots per TaskManager (typically, the number of cores per node)
FLINK_SLOTS=4
# memory per TaskManager
FLINK_TASK_MANAGER_MEMORY=2048



# STORM CONFIGURATIONS
STORM_VERSION=1.0.2

#Strom installation directory
STORM_HOME=${HOME}/apache-storm-${STORM_VERSION}


# Claspaths

HAMA_JAR=${HAMA_HOME}/hama-core-${HAMA_VERSION}.jar
Expand All @@ -117,6 +124,14 @@ for I in ${FLINK_HOME}/lib/*.jar; do
FLINK_JARS=${FLINK_JARS}:$I
done

STORM_JARS=.
for I in ${STORM_HOME}/lib/*.jar; do

if [[ $I != *"log4j-over-slf4j-"* ]]
then
STORM_JARS=${STORM_JARS}:$I
fi
done

# YARN-enabled assembly jar
if [[ -d ${SPARK_HOME}/assembly/target ]]; then
Expand Down
8 changes: 7 additions & 1 deletion core/src/main/java/org/apache/mrql/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ final public class Config {
public static boolean spark_mode = false;
// true, for Flink mode
public static boolean flink_mode = false;
// true, for Storm mode
public static boolean storm_mode = false;
// if true, it process the input interactively
public static boolean interactive = true;
// compile the MR functional arguments to Java bytecode at run-time
Expand Down Expand Up @@ -193,7 +195,11 @@ public static Bag parse_args ( String args[], Configuration conf ) throws Except
} else if (args[i].equals("-flink")) {
flink_mode = true;
i++;
} else if (args[i].equals("-bsp_tasks")) {
}else if (args[i].equals("-storm")) {
storm_mode = true;
i++;
}
else if (args[i].equals("-bsp_tasks")) {
if (++i >= args.length && Integer.parseInt(args[i]) < 1)
throw new Error("Expected max number of bsp tasks > 1");
nodes = Integer.parseInt(args[i]);
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/java/org/apache/mrql/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,14 @@ else if (Config.spark_mode)
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.SparkEvaluator").newInstance();
else if (Config.flink_mode)
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.FlinkEvaluator").newInstance();
else if (Config.storm_mode)
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.StormEvaluator").newInstance();
else // when Config.map_reduce_mode but also the default
Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.MapReduceEvaluator").newInstance();
}

public static void initialize () throws Exception {

if (Evaluator.evaluator == null) {
if (Plan.conf == null)
Plan.conf = new Configuration();
Expand All @@ -86,8 +89,9 @@ public static void main ( String[] args ) throws Exception {
Config.bsp_mode |= arg.equals("-bsp");
Config.spark_mode |= arg.equals("-spark");
Config.flink_mode |= arg.equals("-flink");
Config.storm_mode |= arg.equals("-storm");
};
Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode;
Config.map_reduce_mode = !Config.bsp_mode && !Config.spark_mode && !Config.flink_mode && !Config.storm_mode;
initialize_evaluator();
if (Config.hadoop_mode) {
conf = Evaluator.evaluator.new_configuration();
Expand Down Expand Up @@ -117,6 +121,8 @@ else if (Config.distributed_mode)
System.out.println("Spark mode using "+Config.nodes+" tasks)");
else if (Config.flink_mode)
System.out.println("Flink mode using "+Config.nodes+" tasks)");
else if (Config.storm_mode)
System.out.println("Storm mode using "+Config.nodes+" tasks)");
else if (Config.bsp_mode)
System.out.println("Hama BSP mode over "+Config.nodes+" BSP tasks)");
else if (Config.nodes > 0)
Expand Down
2 changes: 2 additions & 0 deletions core/src/main/java/org/apache/mrql/Plan.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ final static void distribute_compiled_arguments ( Configuration conf ) {
conf.set("mrql.jar.path",Compiler.jar_path);
else if (Config.spark_mode)
conf.set("mrql.jar.path",local_path.toString());
else if (Config.storm_mode)
conf.set("mrql.jar.path",Compiler.jar_path);
else {
// distribute the jar file with the compiled arguments to all clients
Path hdfs_path = new Path("mrql-tmp/class"+random_generator.nextInt(1000000)+".jar");
Expand Down
6 changes: 3 additions & 3 deletions core/src/test/java/org/apache/mrql/QueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@
public abstract class QueryTest extends TestCase {
private static String TEST_QUERY_DIR = "../tests/queries";
private static String TEST_RESULT_DIR = "../tests/results";
private File queryDir;
private File resultDir;
protected File queryDir;
protected File resultDir;
private static Evaluator evaluator;

@BeforeClass
Expand Down Expand Up @@ -154,7 +154,7 @@ public void testXml() throws Exception {
assertEquals(0, queryAndCompare(new File(queryDir, "xml_2.mrql"), resultDir));
}

private int queryAndCompare ( File query, File resultDir ) throws Exception {
protected int queryAndCompare ( File query, File resultDir ) throws Exception {
System.err.println("Testing "+query);
Translator.global_reset();
String qname = query.getName();
Expand Down
3 changes: 3 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
<spark.version>1.6.2</spark.version>
<scala.version>2.10</scala.version>
<flink.version>1.0.3</flink.version>
<storm.version>1.0.2</storm.version>
<skipTests>true</skipTests>
</properties>

Expand All @@ -60,6 +61,7 @@
<module>bsp</module>
<module>spark</module>
<module>flink</module>
<module>storm</module>
<module>dist</module>
</modules>

Expand Down Expand Up @@ -154,6 +156,7 @@
<exclude>.classpath/**</exclude>
<exclude>.project/**</exclude>
<exclude>tests/data/**</exclude>
<exclude>queries/data/**</exclude>
<exclude>tests/results/**</exclude>
<exclude>tests/error_log.txt</exclude>
<exclude>**/dependency-reduced-pom.xml</exclude>
Expand Down
7 changes: 7 additions & 0 deletions queries/data/customers.tbl
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
1|Ramesh|32|Ahmedabad|2000.00|
2|Ramesh|25|Delhi|1500.00|
3|kaushik|23|Kota|2000.00|
4|kaushik|25|Mumbai|6500.00|
5|Hardik|27|Bhopal|8500.00|
6|Komal|22|MP|4500.00|
7|Muffy|24|Indore|10000.00|
22 changes: 22 additions & 0 deletions queries/groupby_stream_storm.mrql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
C = stream(line,"queries/data/","|",type(<CUSTKEY:int,NAME:string,AGE:int,ADDRESS:string,SALARY:float>));

SELECT (k,sum(c.SALARY))
FROM c in C
GROUP BY k: c.NAME;
Loading