develop |
---|
- What is Hyperdrive-trigger?
- Requirements
- How to build and run
- User Interface
- Development
- How to contribute
Hyperdrive-trigger is a Event based workflow manager and scheduler.
A workflow is defined via the graphical interface and consists of three parts: details, sensor and jobs:
- Details - General workflow's information (workflow name, project name and whether is workflow active)
- Sensor - Definition of workflow trigger, describes when workflow will be executed. Sensors that could be used:
- Kafka - waits for kafka message with specific message
- Time - time based trigger, cron-quartz expression or user friendly time recurring definition can be used
- Recurring - workflow is triggered always when previous execution is finished
- Jobs - list of jobs. Supported job types:
- Generic Spark Job - spark job deployed to Apache Hadoop YARN
- Generic Shell Job - job executing a user-defined shell script
The user interface provides a visualization of running workflows with the ability to monitor and troubleshoot executions.
Tested with:
OpenJDK | 1.8.0_25 |
Scala | 2.11 |
Maven | 3.5.4 |
PostgreSQL | 12.2_1 |
Spark | 2.4.4 |
Hadoop Yarn | 2.6.4.0 |
Tomcat | 9.0.24 |
Docker | 2.3.0.5 |
NPM | 6.14.4 |
Angular CLI | 9.0.3 |
Note: Docker is not required.
Adjusted application properties have to be provided. An application properties template file can be found at src/main/resources/application.properties
. Application properties that can be adjusted:
# Version of application.
version=@project.version@
# Enviroment where application will be running
environment=Local
# Maximum number of workflows that can be triggered in bulk (Default value 10)
application.maximumNumberOfWorkflowsInBulkRun=10
# Unique app id, for easier application identification
appUniqueId=
# Health check settings
health.databaseConnection.timeoutMillis=120000
health.yarnConnection.testEndpoint=/cluster/cluster
health.yarnConnection.timeoutMillis=120000
# How will users authenticate. Available options: inmemory, ldap
auth.mechanism=inmemory
#If set, all users that do not have admim role will not have access to admin protected endpoints
auth.admin.role=ROLE_ADMIN
# INMEMORY authentication: username and password defined here will be used for authentication.
auth.inmemory.user=hyperdriver-user
auth.inmemory.password=hyperdriver-password
auth.inmemory.admin.user=hyperdriver-admin-user
auth.inmemory.admin.password=hyperdriver-admin-password
# LDAP authentication: props template that has to be defined in case of LDAP authentication
auth.ad.domain=
auth.ad.server=
auth.ldap.search.base=
auth.ldap.search.filter=
# Core properties.
# How many threads to use for each part of the "scheduler".
# Heart beat interval in milliseconds.
# Lag threshold, before instance is deactivated by another instance.
scheduler.autostart=true
scheduler.thread.pool.size=10
scheduler.sensors.thread.pool.size=20
scheduler.sensors.changedSensorsChunkQuerySize=100
scheduler.executors.thread.pool.size=30
scheduler.jobs.parallel.number=100
scheduler.heart.beat=5000
scheduler.lag.threshold=20000
# Propeties used to send notifications to users.
notification.enabled=false
notification.sender.address=
notification.max.retries=5
notification.delay=0ms
spring.mail.host=
spring.mail.port=
# Kafka Service properties. Used for per-workflow Kafka consumers
kafka.consumers.cache.size=50
#Kafka sensor properties. Not all are required. Adjust according to your use case.
kafkaSource.group.id.prefix=hyper_drive_${appUniqueId}
kafkaSource.poll.duration=500
kafkaSource.always.catchup=true
kafkaSource.properties.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafkaSource.properties.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
kafkaSource.properties.max.poll.records=3
kafkaSource.properties.enable.auto.commit=false
kafkaSource.properties.auto.offset.reset=latest
kafkaSource.properties.security.protocol=
kafkaSource.properties.ssl.truststore.location=
kafkaSource.properties.ssl.truststore.password=
kafkaSource.properties.ssl.keystore.location=
kafkaSource.properties.ssl.keystore.password=
kafkaSource.properties.ssl.key.password=
kafkaSource.properties.sasl.kerberos.service.name=
kafkaSource.properties.sasl.mechanism=
kafkaSource.properties.sasl.jaas.config=
# Recurring sensor properties.
recurringSensor.maxJobsPerDuration=8
recurringSensor.duration=1h
#Spark properties. Properties used to deploy and run Spark job. Not all are required. Adjust according to your use case.
#Where spark jobs will be executed. Available options: yarn, emr.
spark.submitApi=yarn
#Submit api = YARN
sparkYarnSink.submitTimeout=160000
sparkYarnSink.master=yarn
sparkYarnSink.filesToDeploy=
sparkYarnSink.additionalConfs.spark.ui.port=
sparkYarnSink.additionalConfs.spark.executor.extraJavaOptions=
sparkYarnSink.additionalConfs.spark.driver.extraJavaOptions=
sparkYarnSink.additionalConfs.spark.driver.memory=2g
sparkYarnSink.additionalConfs.spark.executor.instances=2
sparkYarnSink.additionalConfs.spark.executor.cores=2
sparkYarnSink.additionalConfs.spark.executor.memory=2g
sparkYarnSink.additionalConfs.spark.yarn.keytab=
sparkYarnSink.additionalConfs.spark.yarn.principal=
sparkYarnSink.additionalConfs.spark.shuffle.service.enabled=true
sparkYarnSink.additionalConfs.spark.dynamicAllocation.enabled=true
#Submit api = EMR
spark.emr.clusterId=
spark.emr.filesToDeploy=
spark.emr.additionalConfs=
#Common properties for Submit api = YARN and EMR
sparkYarnSink.hadoopResourceManagerUrlBase=
sparkYarnSink.userUsedToKillJob=
spark.submit.thread.pool.size=10
#Postgresql properties for connection to trigger database
db.driver=org.postgresql.Driver
db.url=jdbc:postgresql://
db.user=
db.password=
db.keepAliveConnection=true
db.connectionPool=HikariCP
db.numThreads=4
db.skip.liquibase=false
spring.liquibase.change-log=classpath:/db_scripts/liquibase/db.changelog.yml
The Hadoop configuration directory needs to be added as the environment variable HADOOP_CONF_DIR
and it has to be added to the web application's classpath.
- The environment variable can be added in
<tomcat-root>/bin/setenv.sh
, e.g.HADOOP_CONF_DIR=/opt/hadoop
. - To add the Hadoop configuration directory to the application classpath,
in the file
<tomcat-base>/conf/catalina.properties
, append to the keyshared.loader
the hadoop conf dir, e.g.shared.loader="/opt/hadoop"
.
With Feature #700: Skip dag instance creation if no new message is available in Kafka,
the application needs to access files that are defined in job templates for Hyperdrive jobs. Especially, it will need
to access any files specified under reader.option.kafka
to configure Kafka consumers, e.g. keystore, truststore
and keytabs under the same path as the Spark job would see them.
For example, a (resolved) job template may include
- Additional files:
/etc/config/keystore.jks#keystore.jks
- App arguments:
reader.option.kafka.ssl.keystore.location=keystore.jks
In this case, obviously /etc/config/keystore.jks
needs to exist to submit the job, but additionally,
<tomcat-root-directory>/keystore.jks
needs to exist such that the web application can access the file under the same path
as the Spark job would, in order to be able to create a Kafka consumer using the same configuration as the Spark job. This
may obviously be achieved using symbolic links.
For access to HDFS, spark.yarn.keytab
and spark.yarn.principal
from the application properties are used for authentication.
No symbolic links are required.
For development purposes, hyperdrive-trigger can be executed as an application with an embedded tomcat. Please check out branch feature/embedded-tomcat-2 to use it.
To build an executable jar and execute it, use the following commands:
- Package jar:
mvn clean package
or without testsmvn clean package -DskipTests
- Execute it:
java -jar ./target/hyperdrive-trigger-<VERSION>.jar
Access the application at
http://localhost:7123/#
For local and iterative front end development, the UI can be run using a live development server with the following commands:
- Install required packages:
cd ui && npm install
- Start front end application:
cd ui && ng serve
orcd ui && npm start
Access the application at
http://localhost:4200/#
From the project root directory, run the Docker build command
docker build -t {your-image-name:your-tag} .
The Docker image can also be built using Maven, with the [https://github.com/spotify/dockerfile-maven](Spotify Dockerfile Maven plugin).
From the project root directory, run the Maven install command with the docker profile enabled (see below):
mvn clean install \
-D skipTests \ # Skip unit and integration tests
-D docker \ # Activate "docker" profile
-D dockerfile.repositoryUrl=my # The name prefix of the final Docker image(s)
This will create a docker image with the name my/hyperdrive-trigger
, tagged as {project_version-{commit-id}
and latest
e.g. my/hyperdrive-trigger:0.5.3-SNAPSHOT-6514d3f22a4dcd73a734c614db96694e7ebc6efc
, and my/hyperdrive-trigger:latest
Hyperdrive-trigger can be packaged as a Web Application Archive and executed in a web server.
- Without tests:
mvn clean package -DskipTests
- With unit tests:
mvn clean package
To build Hyperdrive-Trigger without the hortonworks hadoop binaries, specify the property exclude-hortonworks
, e.g.
mvn clean package -Dexclude-hortonworks
The liquibase maven plugin may be used to issue liquibase commands. To use it, copy
/etc/liquibase/liquibase-maven-plugin.properties.template
to /etc/liquibase/liquibase-maven-plugin.properties
and modify it as needed.
Then, the liquibase maven plugin can be executed, e.g.
mvn liquibase:status
to view the statusmvn liquibase:dropAll
to drop all tables, views etc.mvn liquibase:update
to apply all pending changesets
mvn apache-rat:check
to verify required copyright headersmvn scalafmt:format -Dformat.validateOnly=true
to validate scala files formattingmvn scalafmt:format
ormvn scalafmt:format -Dformat.validateOnly=false
to apply correct scala file formatting
If required, old rows from the job_instance
table can be moved to the archive_job_instance
table, to reduce the
table size of job_instance
in order to speed up queries. The rows are first copied to the destination table and then
deleted from the source table. Along with job instances, referenced dag_instance
s and event
s are archived in
respective tables as well. Importantly, the job_parameters
column of job_instance
is not
archived, but discarded.
The archival process can be executed with the following DB procedure
CALL archive_dag_instances(
i_to_ts => (now() - interval '3 months')::timestamp,
i_max_records => 200000,
i_chunk_size => 10000
);
This would archive all dag instances (and referenced job instances and events), which were created over 12 months ago. It is advisable to run this query while the database is in maintenance mode, but it can also be run otherwise.
Please see our Contribution Guidelines.