Skip to content
This repository has been archived by the owner on Oct 17, 2023. It is now read-only.

Pipelines

Dj Walker-Morgan edited this page Mar 29, 2017 · 3 revisions

A Pipeline is a sequence of adaptors, defined in the configuration section of pipeline.js, which connect a source adaptor, any number of transformers and any number of sink adaptors to create a path for messages to flow down. A pipeline is typically defined at the end of the pipeline.js JavaScript file. Here is a typical generated pipeline.js file, less some of the commented-out configuration options:

var source = mongodb({
  "uri": "${MONGODB_URI}"
})

var sink = elasticsearch({
  "uri": "${ELASTICSEARCH_URI}"
})

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

There are three ways you can create and chain adapters. All of them start with t. where the t stands for "transporter". and all the pipelines are "chained" to it. There are three JavaScript functions, Source(), Save() and Transform() which are used to create a pipeline.

The default generated versions of the pipeline is:

t.Source("source", source, "/.*/").Save("sink", sink, "/.*/")

Here, each pipeline component takes three parameters. These are, in order, a name, an adapter and a namespace string. So here we have a Source with the name "source", using the adapter in the variable source and a namespace "/.*/", a regular expression which will match anything.

Namespaces can be omitted and default to matching anything, which leads to a second version of the pipeline:

t.Source("source", source).Save("sink", sink)

The name can also be omitted for an even more compact pipeline expression, which gives us a third version:

t.Source(source).Save(sink)

This is the most compact invocation. The adapter variable is the only thing passed to the pipeline component. The drawback is that the name is autogenerated as a long hex UUID, so be aware that the logs, which will refer to the name will be full of these ids. We suggest you stick with the longer, three parameter version of the call for anything other than quick tests or proof of concept work.

Namespaces

A node's namespace can be a simple or complex regular expression or a fragment of text which will match anything that contains that text. For an exact match to say "table", the namespace would be /^table$/ with the ^ denoting the start and the $ denoting the end of the string to match.

How the namespace is used depends on the type of node.

With a Source(), the namespace is passed down to the underlying adaptor as a request to pull data from sources which match the namespace. For example, with MongoDB, the adaptor uses the namespace to select which collections should be selected. So, if a database has collections robocop, robosaurus and robbietherobot, then a namespace of robo would select all of them (as they contain robo), while a namespace of \^robo.*\ would only select robocop and robosaurus. Every document retrieved from those collections would have its namespace set to the name of the collection it came from.

For Save() sinks and Transform() transformers, the namespace setting then acts as a filter on which messages those sink or transformers are applied to. The namespace of an incoming message has to match the sink's namespace setting to be processed by the sink or the transformer before being handed to them. If not specified, then the sinks and transformers will accept any message.

Note that the transformers in the pipeline can modify a message's namespace allowing the pipeline to deliver messages to different sinks from one source.

Clone this wiki locally