-
Notifications
You must be signed in to change notification settings - Fork 213
Pipelines
A Pipeline is a sequence of adaptors, defined in the configuration section of pipeline.js
, which connect a source adaptor, any number of transformers and any number of sink adaptors to create a path for messages to flow down. A pipeline is typically defined at the end of the pipeline.js
JavaScript file. Here is a typical generated pipeline.js
file, less some of the commented-out configuration options:
var source = mongodb({
"uri": "${MONGODB_URI}"
})
var sink = elasticsearch({
"uri": "${ELASTICSEARCH_URI}"
})
t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
There are three ways you can create and chain adapters. All of them start with t.
where the t stands for "transporter". and all the pipelines are "chained" to it. There are three JavaScript functions, Source()
, Save()
and Transform()
which are used to create a pipeline.
The default generated versions of the pipeline is:
t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")
Here, each pipeline component takes three parameters. These are, in order, a name, an adapter and a namespace string. So here we have a Source
with the name "source", using the adapter in the variable source
and a namespace "/.*/", a regular expression which will match anything.
Namespaces can be omitted and default to matching anything, which leads to a second version of the pipeline:
t.Source("source", source).Save("sink", sink)
The name can also be omitted for an even more compact pipeline expression, which gives us a third version:
t.Source(source).Save(sink)
This is the most compact invocation. The adapter variable is the only thing passed to the pipeline component. The drawback is that the name is autogenerated as a long hex UUID, so be aware that the logs, which will refer to the name will be full of these ids. We suggest you stick with the longer, three parameter version of the call for anything other than quick tests or proof of concept work.
A node's namespace can be a simple or complex regular expression or a fragment of text which will match anything that contains that text. For an exact match to say "table", the namespace would be /^table$/
with the ^
denoting the start and the $
denoting the end of the string to match.
How the namespace is used depends on the type of node.
With a Source()
, the namespace is passed down to the underlying adaptor as a request to pull data from sources which match the namespace. For example, with MongoDB, the adaptor uses the namespace to select which collections should be selected. So, if a database has collections robocop
, robosaurus
and robbietherobot
, then a namespace of robo
would select all of them (as they contain robo
), while a namespace of \^robo.*\
would only select robocop
and robosaurus
. Every document retrieved from those collections would have its namespace set to the name of the collection it came from.
For Save()
sinks and Transform()
transformers, the namespace setting then acts as a filter on which messages those sink or transformers are applied to. The namespace of an incoming message has to match the sink's namespace setting to be processed by the sink or the transformer before being handed to them. If not specified, then the sinks and transformers will accept any message.
Note that the transformers in the pipeline can modify a message's namespace allowing the pipeline to deliver messages to different sinks from one source.
Transporter is open source, built by the good people of Compose (and you, if you want to contribute).