-
Written in Java and scripted using JavaScript.
-
Using a general-purpose scripting language for defining workflows removes most configuration boilerplate.
-
Simple and robust architecture that keeps running even when Hadoop is down and picks up where it left off when it’s back up.
-
Support for continuous delivery of workflows.
-
Parsimonious user interface that nevertheless gives a good overview over hundreds of workflows.
-
Battle-tested for years at Collective, Inc and found practically defect-free.
The Celos UI
Celos works in conjunction with the Apache Oozie execution engine, so we’ll quickly describe Oozie before diving into Celos proper.
Oozie provides services such as distribution of jobs across a cluster, log aggregation, and command-line and Web UI tools for inspecting running jobs.
To run a job in Oozie it is packaged into a workflow directory and put into HDFS, as the following diagram shows:
Oozie requires a small XML file that describes what the job should do
(in this case call a Java main with some arguments), and then runs it
on a processing node in the cluster. Artefacts in the lib
directory
are automatically placed on the classpath. Arguments (such as
inputPath
and outputPath
in the above example) can be passed to
the job.
Oozie XML files support many different kinds of actions besides calling Java, such as manipulating HDFS, loading data into databases, or sending emails. This is mostly out of scope for this document, but in general, Celos workflows can use all features of Oozie.
You can view all currently running jobs in the Oozie Web UI, Hue:
You can also view details of a particular job in Hue:
If we have the Oozie workflow directory to run in HDFS at
/wordcount
, hourly input data in /input/YYYY-MM-DD/HH00
, and want
to write output data to /output/YYYY-MM-DD/HH00
, we can set up a
simple Celos workflow with the ID wordcount
like this:
celos.defineWorkflow({
"id": "wordcount",
"schedule": celos.hourlySchedule(),
"schedulingStrategy": celos.serialSchedulingStrategy(),
"trigger": celos.hdfsCheckTrigger("/input/${year}-${month}-${day}/${hour}00/_READY"),
"externalService": celos.oozieExternalService({
"oozie.wf.application.path": "/wordcount/workflow.xml",
"inputPath": "/input/${year}-${month}-${day}/${hour}00/",
"outputPath": "/output/${year}-${month}-${day}/${hour}00/",
})
});
A Celos workflow always has an Oozie workflow
(/wordcount/workflow.xml
in this case) as its "meat".
If we were to receive data from two sources, say two datacenters in
/input/nyc
and /input/lax
, we can define a helper function and use
that to quickly define two workflows with the IDs wordcount-nyc
and
wordcount-lax
:
function defineWordCountWorkflow(dc) {
celos.defineWorkflow({
"id": "wordcount-" + dc,
"schedule": celos.hourlySchedule(),
"schedulingStrategy": celos.serialSchedulingStrategy(),
"trigger": celos.hdfsCheckTrigger("/input/" + dc + "/${year}-${month}-${day}/${hour}00/_READY"),
"externalService": celos.oozieExternalService({
"oozie.wf.application.path": "/wordcount/workflow.xml",
"inputPath": "/input/" + dc + "/${year}-${month}-${day}/${hour}00/",
"outputPath": "/output/" + dc + "/${year}-${month}-${day}/${hour}00/",
})
});
}
defineWordCountWorkflow("nyc");
defineWordCountWorkflow("lax");
Here’s an overview over schedules, triggers, and scheduling strategies, described below:
Each workflow has a schedule that determines the points in time (called slots) at which the workflow should run.
Celos supports cron
-like schedules with [celos.cronSchedule]:
// A workflow using this schedule will run every hour.
celos.cronSchedule("0 0 * * * ?");
// A workflow using this schedule will run every day at midnight.
celos.cronSchedule("0 0 0 * * ?");
// A workflow using this schedule will run every day at 5am.
celos.cronSchedule("0 0 5 * * ?");
Another type of schedule is [celos.dependentSchedule], which makes a workflow use the same schedule as another workflow. This is useful for setting up a downstream workflow that tracks an upstream workflow, without having to duplicate the schedule definition.
For each slot of a workflow, a trigger is used to determine whether it’s ready to run, or needs to wait.
Let’s look at some commonly used simple triggers.
[celos.hdfsCheckTrigger] waits for a file or directory in HDFS:
// A slot at time T will wait for the file /logs/YYYY-MM-DD/HH00/_READY in HDFS.
celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}00/_READY");
[celos.successTrigger] waits for the success of another workflow, allowing the definition of dependencies among workflows:
// A slot at time T will wait until the slot at time T of
// the workflow with the ID "workflow-foo" is successful.
celos.successTrigger("workflow-foo")
[celos.delayTrigger] waits until the current wallclock time is a given number of seconds after the slot’s time:
// A slot at time T will wait until the current time is one hour after T.
celos.delayTrigger(60 * 60)
[celos.offsetTrigger] lets us offset another trigger a given number of seconds into the future or past.
// A slot at time T will wait until the _next hour's_ file is available in HDFS.
celos.offsetTrigger(60 * 60, celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}00/_READY"));
We can also combine triggers with [celos.andTrigger], [celos.orTrigger], and [celos.notTrigger]:
// A slot at time T will wait until both /input-a/YYYY-MM-DD/HH00/_READY
// and /input-b/YYYY-MM-DD/HH00/_READY is in HDFS.
celos.andTrigger(celos.hdfsCheckTrigger("/input-a/${year}-${month}-${day}/${hour}00/_READY"),
celos.hdfsCheckTrigger("/input-b/${year}-${month}-${day}/${hour}00/_READY"));
// A slot at time T will wait until the current hour's file, the next hour's file,
// and the file for the hour after that are in HDFS.
var hdfsCheck = celos.hdfsCheckTrigger("/logs/${year}-${month}-${day}/${hour}00/_READY");
celos.andTrigger(hdfsCheck,
celos.offsetTrigger(60 * 60 * 1, hdfsCheck),
celos.offsetTrigger(60 * 60 * 2, hdfsCheck));
// A slot at time T will be ready if, after one hour, the slot at time T
// of the other workflow "workflow-bar" is _not_ successful.
// This can be used to send an alert for example.
celos.andTrigger(celos.delayTrigger(60 * 60),
celos.notTrigger(celos.successTrigger("workflow-bar"));
This last trigger should be used in conjunction with a
celos.dependentSchedule("workflow-bar")
.
A workflow’s scheduling strategy determines when and in which order the ready slots of the workflow should be run.
There’s only one scheduling strategy at the moment, [celos.serialSchedulingStrategy], which executes ready slots oldest first, with a configurable concurrency level.
// A workflow using this scheduling strategy will run three slots in parallel.
celos.serialSchedulingStrategy(3);
The main data sources Celos uses are:
The workflows directory contains JavaScript files that define workflows.
It may look like this:
workflows/ wordcount.js some-other-workflow.js yet-another-workflow.js
The state database directory contains the state of each slot as a small JSON file.
db/ state/ wordcount-lax/ 2015-09-15/ 00:00:00.000Z 01:00:00.000Z 02:00:00.000Z ... wordcount-nyc/ 2015-09-15/ 00:00:00.000Z 01:00:00.000Z 02:00:00.000Z ...
An individual slot file in the state database,
e.g. db/state/wordcount-lax/2015-09-15/01:00:00.000Z
, looks like
this:
{ "status": "SUCCESS", "externalID": "0008681-150911205802478-oozie-oozi-W", "retryCount": 0 }
The status
field records the state the slot is in.
The externalID
field contains the Oozie ID of the corresponding
Oozie workflow execution if the slot is running, successful, or failed
(otherwise it’s null).
The retryCount
records how many times the slot has already been
retried after failure.
On each scheduler step (typically triggered once per minute from
cron
), Celos evaluates all JavaScript files in the workflows
directory, yielding a set of uniquely identified workflows.
Then, for each workflow, Celos fetches all slot files within a sliding window of 7 days before the current date from the state database.
Each slot is a state machine with the following states:
Celos takes the following action, depending on the state of the slot:
State | Action |
---|---|
WAITING |
Call the workflow’s trigger to determine whether the slot is ready. If the trigger signals readiness, put the slot into the READY state. If the slot has been waiting for too long, put the slot into the WAIT_TIMEOUT state. Otherwise, keep the slot in the WAITING state. |
READY |
Pass the slot as a candidate for scheduling to the workflow’s scheduling strategy. If the strategy chooses to execute the slot, submit it to Oozie, and put it into the RUNNING state. Otherwise, keep the slot in the READY state. |
RUNNING |
Ask Oozie for the status of the execution. If the slot is still executing, keep it in the RUNNING state. If the slot has succeeded, put it into the SUCCESS state. If the slot has failed, but there are retries left, put the slot into the WAITING state again. If the slot has failed, and there are no more retries left, put the slot into the FAILURE state. |
SUCCESS |
Do nothing. |
FAILURE |
Do nothing. |
WAIT_TIMEOUT |
Do nothing. |
KILLED |
Do nothing. |
The state database contains additional information about slots that have been manually rerun with the /rerun HTTP API.
In the following example, the slots 2015-08-01T01:00Z
and
2015-08-01T02:00Z
of the workflow wordcount-nyc
have been rerun.
They are outside the sliding window, so the above scheduling algorithm
would not look at the slots.
However, rerunning a slot touches an additional file in the rerun
subdirectory of the state database, and slots for which such a file
exists are fed into the scheduling algorithm in addition to the slots
from the 7 day sliding window.
db/ state/ ... as above ... rerun/ wordcount-nyc/ 2015-08-01/ 01:00:00.000Z 02:00:00.000Z
Rerunning thus serves two purposes: besides the main use of rerunning a slot, it can also be used to backfill data, by using it to mark slots outside the sliding window that the scheduler should care about.
Celos has a defaults directory that contains JavaScript files that can be imported into a workflow JavaScript file with [celos.importDefaults]. Such defaults files are used for sharing global variables and utility functions.
Celos writes daily-rotating logs to a logs directory.
All directories (workflows, defaults, logs, and database) are configurable via [Server Command-Line Arguments].
Changing a workflow definition in Celos is as simple as updating the workflow JavaScript file and/or the Oozie workflow definition in HDFS. On the next scheduler step, Celos will pick up the changes.
Bundled with Celos comes a tool called Celos CI (see [Celos CI Reference] as well as samples/quickstart) that automates this process, and can be used in conjunction with GitHub and a CI server such as Jenkins for continuous delivery of Celos workflows.
For each group of related workflows, we have a GitHub repository and a Jenkins job that deploys the workflows on push to master using Celos CI. Celos CI copies the JavaScript files to the Celos host with SFTP, and uploads the Oozie workflow directory to HDFS.
As of September 2015, Celos has been in use at Collective for about two years, and is currently running all of our Hadoop processing (hundreds of individual workflows across dozens of projects).
Celos is productively used by people from different backgrounds, such as data science, operations, software engineering, and database administration, and has proven to be a welcome improvement on our previous Oozie coordinator-based scheduling.
We’re proud that in two years of use, not a single bug in Celos has caused any downtime, which is attributable to the small codebase (about 2500 non-blank, non-comment lines of code for core Celos, as measured by cloc 1.56) and the rigorous test suite (hundreds of unit tests and an extensive integration test).
-
JDK 1.8
-
Apache Hadoop 2.5.0
-
Apache Oozie 4.1.0
You can probably get away with slightly older Hadoop and Oozie versions.
scripts/build.sh
This will build the following JARs:
-
celos-server/build/libs/celos-server.jar (see [Celos Server Reference])
-
celos-ci/build/libs/celos-ci-fat.jar (see [Celos CI Reference])
-
celos-ui/build/libs/celos-ui.jar (see [Celos UI Reference])
Head over to samples/quickstart.
We’d love to help you try out and use Celos!
For now, please use the Issue Tracker if you have questions or comments.
Developers, developers, developers:
Head honcho: Chris Ingrassia
The options
argument is an object with the following fields:
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
The identifier string for the workflow, must be unique. |
|
Yes |
The trigger that determines data availability for the workflow. |
|
|
Yes |
The schedule that determines the points in time at which the workflow should run. |
|
|
Yes |
The scheduling strategy that determines when and in which order ready slots should be run. |
|
|
Yes |
The external service actually responsible for executing the job. |
|
|
String (ISO 8601, UTC) |
No |
The date when the workflow should start executing (default: "1970-01-01T00:00Z"). |
|
Number |
No |
The number of times a slot of this workflow should be automatically retried if it fails (default: 0). |
|
Number |
No |
The number of seconds a workflow should stay waiting until it times out (default: |
celos.defineWorkflow({
"id": "my-workflow",
"schedule": celos.hourlySchedule(),
"schedulingStrategy": celos.serialSchedulingStrategy(),
"trigger": celos.alwaysTrigger(),
"externalService": celos.oozieExternalService({
"oozie.wf.application.path": "/my-workflow/workflow.xml",
"param1": "Hello",
"param2": "World"
})
});
Evaluates a file from the defaults directory in the current scope, so all variables and functions from the file become available in the current file.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
The name of the defaults file to import, without the ".js" suffix. |
A trigger determines (for each point in time at which a workflow runs) whether the preconditions for running the workflow (such as data availability, or success of upstream workflows are met).
Makes a workflow wait for a file or directory in HDFS. Often used to wait for _READY or _SUCCESS files.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
The HDFS path to wait for. May include the variables |
|
String |
No |
The |
Makes a workflow wait for the success of another workflow at the same time. This is used to define dependencies among workflows.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
The ID of the other workflow to wait for. |
Name | Type | Required | Description |
---|---|---|---|
|
No |
The nested triggers. If no nested triggers are specified, the trigger is always ready. |
Name | Type | Required | Description |
---|---|---|---|
|
No |
The nested triggers. If no nested triggers are specified, the trigger is never ready. |
Name | Type | Required | Description |
---|---|---|---|
|
Yes |
The nested trigger to negate. |
Name | Type | Required | Description |
---|---|---|---|
|
Number |
Yes |
The number of seconds to offset into the future (if positive) or past (if negative). |
|
Yes |
The nested trigger to offset. |
Waits until a specified amount of time has passed between the slot’s scheduled time and the current wallclock time.
A schedule determines the points in time (slots) at which a workflow should run.
A cron-like schedule.
The full cron syntax is described here: http://www.quartz-scheduler.org/documentation/quartz-1.x/tutorials/crontrigger
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
The cron expression. |
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
The workflow ID of the other workflow. |
A scheduling strategy determines the order in which the ready slots of a workflow are executed.
Name | Type | Required | Description |
---|---|---|---|
|
Number |
No |
The number of slots to execute at the same time (defaults to 1). |
An external service actually executes a workflow.
Name | Type | Required | Description |
---|---|---|---|
|
Object |
Yes |
Properties to pass to Oozie. |
|
String |
No |
The HTTP URL of the Oozie API. If not specified, the value of the [CELOS_DEFAULT_OOZIE] variable will be used. |
Inside property values, the variables ${year}
, ${month}
, ${day}
,
${hour}
, ${minute}
, and ${second}
, will be replaced by the
zero-padded values from the slot’s scheduled time.
year
, month
, day
, hour
, minute
, and second
will also be
set as Oozie properties, so they can be used in the Oozie workflow XML
file.
Additionally, Celos will set the Oozie property celosWorkflowName
to
a string like "my-workflow@2015-09-12T20:00Z", useful for display.
oozie.wf.application.path
is the only property required by Oozie.
It points to a
Oozie
workflow XML file within an Oozie workflow directory. There can be
multiple XML files within a single Oozie workflow directory.
If [CELOS_DEFAULT_OOZIE_PROPERTIES] is defined and an Object, its members are added (before other properties, so they can be overridden) to the Oozie properties.
If defined, these global variables influence some API functions. It’s a good idea to set them in a common defaults file imported by all workflows.
The String value of this variable will be used as the default HDFS name node URI by [celos.hdfsCheckTrigger].
The String value of this variable will be used as the default Oozie API URL by [celos.oozieExternalService].
The members of this Object will be added (before other properties, so they can be overridden) to the Oozie properties of a workflow by [celos.oozieExternalService].
The celos-server.jar launches Celos.
The celos-server.jar must be run in the following way, due to the need to put the Hadoop configuration on the classpath:
java -cp celos-server.jar:/etc/hadoop/conf com.collective.celos.server.Main <arguments...>
Name | Type | Required | Description |
---|---|---|---|
|
Integer |
Yes |
HTTP port for server. |
|
Path |
No |
Workflows directory (defaults to /etc/celos/workflows). |
|
Path |
No |
Defaults directory (defaults to /etc/celos/defaults). |
|
Path |
No |
Logs directory (defaults to /var/log/celos). |
|
Path |
No |
State database directory (defaults to /var/lib/celos/db). |
|
Integer |
No |
Interval (in seconds) between scheduler steps. If not supplied, Celos will not automatically step the scheduler, and wait for POSTs to the /scheduler servlet instead. |
Doing a POST to this servlet initiates a scheduler step.
In production we do this once a minute from cron
.
Doing a GET to this servlet returns the list of workflows loaded into Celos.
Doing a GET to this servlet returns the slots of a workflow within a time range.
It also returns other information about the workflow, such as its paused state (see the /pause servlet).
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
ID of the workflow. |
|
String (ISO 8601, UTC) |
No |
Time (exclusive) of most recent slot to return. Defaults to current time. |
|
String (ISO 8601, UTC) |
No |
Time (inclusive) of earliest slot to return. Defaults to 1 week before |
curl "localhost:1234/workflow-slots?id=workflow-1"
prints:
{ "paused": false, "slots" : [ { "time" : "2015-09-13T13:50:00.000Z", "status" : "READY", "externalID" : null, "retryCount" : 0 }, { "time" : "2015-09-13T13:45:00.000Z", "status" : "SUCCESS", "externalID" : "0004806-150911205802478-oozie-oozi-W", "retryCount" : 0 }, { "time" : "2015-09-13T13:40:00.000Z", "status" : "SUCCESS", "externalID" : "0004804-150911205802478-oozie-oozi-W", "retryCount" : 0 }, ... ] }
Doing a GET to this servlet returns human-readable information about why a slot is waiting.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
ID of the workflow. |
|
String (ISO 8601, UTC) |
Yes |
Scheduled time of slot to check. |
curl "localhost:1234/trigger-status?id=workflow-1&time=2015-09-13T13:00Z"
prints:
{ "type" : "AndTrigger", "ready" : false, "description" : "Not all nested triggers are ready", "subStatuses" : [ { "type" : "DelayTrigger", "ready" : false, "description" : "Delayed until 2015-09-14T16:00:00.000Z", "subStatuses" : [ ] }, { "type" : "HDFSCheckTrigger", "ready" : true, "description" : "HDFS path hdfs://nameservice1/logs/dc3/2015-09-14/1500 is ready", "subStatuses" : [ ] } ] }
Doing a POST to this servlet instructs Celos to mark a slot for rerun.
The slot’s state will be reset to waiting and its retry count will be reset to 0.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
ID of the workflow. |
|
String (ISO 8601, UTC) |
Yes |
Scheduled time of slot to rerun. |
Doing a POST to this servlet marks a slot as killed and also kills its underlying Oozie job, if any.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
ID of the workflow. |
|
String (ISO 8601, UTC) |
Yes |
Scheduled time of slot to kill. |
Doing a POST to this servlet pauses or unpauses a workflow. While a workflow is paused, the scheduler will simply ignore it. This means it doesn’t check any triggers for the workflow, doesn’t submit new jobs to the workflow’s external service, and doesn’t perform any other action related to the workflow.
You can check whether a workflow is paused by looking at the paused
field of the result of the /workflow-slots servlet.
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
ID of the workflow. |
|
Boolean |
Yes |
Whether to pause ( |
The celos-ci-fat.jar can be used to deploy workflow and defaults JavaScript files, as well as Oozie workflow directories automatically.
java -jar celos-ci-fat.jar <arguments...>
Name | Type | Required | Description |
---|---|---|---|
|
String |
Yes |
|
|
String |
Yes |
Name of workflow (or rather, project). |
|
Path |
Yes |
The deployment directory (not needed for |
|
URL |
Yes |
The target file (file: or sftp: URL). |
|
Path |
No |
HDFS data will be placed under this root (defaults to /user/celos/app). |
A deployment directory must follow a canonical directory layout:
workflow.js defaults.js hdfs/ workflow.xml ... lib/ ...
-
workflow.js
is the JavaScript file containing workflow definitions. -
defaults.js
is a JavaScript defaults file containing variables and utility functions. -
hdfs
is the Oozie workflow directory.
If WORKFLOW_NAME
is the value of the --workflowName
argument, and
HDFS_ROOT
is the value of the --hdfsRoot
argument, and
WORKFLOWS_DIR
and DEFAULTS_DIR
are the Celos workflows and
defaults directories specified in the target file, respectively, then
the files will be deployed to the following locations:
workflow.js -> $WORKFLOWS_DIR/$WORKFLOW_NAME.js defaults.js -> $DEFAULTS_DIR/$WORKFLOW_NAME.js hdfs/ -> $HDFS_ROOT/$WORKFLOW_NAME
A target file is a JSON file that describes a Celos and HDFS setup.
Name | Type | Required | Description |
---|---|---|---|
|
URL |
Yes |
URL of Hadoop hdfs-site.xml File |
|
URL |
Yes |
URL of Hadoop core-site.xml File |
|
URL |
Yes |
URL of Celos defaults directory. |
|
URL |
Yes |
URL of Celos workflows directory. |
All fields must be file: or sftp: URLs.
Example:
{
"hadoop.hdfs-site.xml": "sftp://celos002.ewr004.collective-media.net/etc/hadoop/conf/hdfs-site.xml",
"hadoop.core-site.xml": "sftp://celos002.ewr004.collective-media.net/etc/hadoop/conf/core-site.xml",
"defaults.dir.uri": "sftp://celos002.ewr004.collective-media.net/etc/celos/defaults",
"workflows.dir.uri": "sftp://celos002.ewr004.collective-media.net/etc/celos/workflows",
}
The best practice for using Celos CI is putting a target file for each
Celos installation (e.g. production and staging) in a central,
SFTP-accessible location, and storing the target file’s SFTP URL in an
environment variable (e.g. PRODUCTION_TARGET
and STAGING_TARGET
).
Deploy scripts using Celos CI should then pass this variable as the
--target
argument to Celos CI, making them independent of the Celos
installation to which the workflow is to be deployed. See
samples/quickstart for an example.
The celos-ui.jar runs the Celos user interface.
java -jar celos-ui.jar <arguments...>
Name | Type | Required | Description |
---|---|---|---|
|
Integer |
Yes |
HTTP port for UI. |
|
URL |
Yes |
Celos URL. |
|
URL |
No |
Hue (Oozie UI) URL. If specified, slots become linked to Hue. |
|
Path |
No |
JSON [UI Config File]. |
If the --config
argument is not supplied to the UI, it simply
renders one long list of all loaded workflows.
By supplying a JSON file with the following format to --config
, the
workflows can be hierarchically grouped (one level):
{
"groups":
[
{
"name": "Group 1",
"workflows":
[
"workflow-1",
"workflow-2"
]
},
{
"name": "Group 2",
"workflows":
[
"workflow-3"
]
}
]
}
The Lord of the Rings Dictionary defines it as:
Celos S; also Kelos; freshet; kel- flow away [Sil; *kelu-]; one would want to choose los snow [Sil] for the final element, but the text of Unfinished Tales, Index, entry Celos states the final form derives from Q -sse, -ssa, a form of emphasis [some say locative], making the definition 'much flowing' or 'freshet', often resulting from melting snow; perhaps 'snow' is then implied from the ending; a river in Gondor
Alternatively, the Devil’s Dictionary of Programming defines it as:
Configurable: It’s your job to make it usable. Elegant: The only use case is making me feel smart. Lightweight: I don’t understand the use-cases the alternatives solve. Opinionated: I don’t believe that your use case exists. Simple: It solves my use case.