Skip to content

V2: Intro

Eugene Lazutkin edited this page Aug 24, 2024 · 2 revisions

Just examples. Think about how you would do them without stream-chain.

Simple pipeline

const Chain = require('stream-chain');

const fs = require('node:fs');
const zlib = require('node:zlib');
const {Transform} = require('node:stream');

// the chain will work on a stream of number objects
const chain = new Chain([
  // transforms a value
  x => x * x,
  // returns several values
  x => [x - 1, x, x + 1],
  // waits for an asynchronous operation
  async x => await getTotalFromDatabaseByKey(x),
  // returns multiple values with a generator
  function* (x) {
    for (let i = x; i >= 0; --i) {
      yield i;
    }
  },
  // filters out even values
  x => x % 2 ? x : null,
  // uses an arbitrary transform stream
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, x.toString());
    }
  }),
  // compress
  zlib.createGzip()
]);
// log errors
chain.on('error', error => console.log(error));
// use the chain, and save the result to a file
dataSource.pipe(chain).pipe(fs.createWriteStream('output.txt.gz'));

Using asynchronous generators

const {chain} = require('stream-chain');

const family = chain([
  async function*(person) {
    yield person;
    // asynchronously retrieve parents
    if (person.father) {
      yield await getPersonFromDB(person.father);
    }
    if (person.mother) {
      yield await getPersonFromDB(person.mother);
    }
    // asynchronously retrieve children, if any
    for (let i = 0; i < person.children; ++i) {
      yield await getPersonFromDB(person.children[i]);
    }
  },
  new Transform({
    writableObjectMode: true,
    transform(x, _, callback) {
      // transform to text
      callback(null, JSON.stringify(x));
    }
  }),
  zlib.createGzip(),
  fs.createWriteStream('families.json-stream.gz')
]);

people.pipe(family);

Combined functions

A block of regular functions can be separated and included in an array. Functions in such block will be combined without using streams to improve the performance.

const lessEfficient = chain([
  x => x * x,
  x => 2 * x
  // ... more stages
]);

const moreEfficient = chain([
  [
    x => x * x,
    x => 2 * x
  ]
  // ... more stages
]);

const comp = request('stream-chain/utils/comp');

const evenMoreEfficient = chain([
  comp(
    x => x * x,
    x => 2 * x
  )
  // ... more stages
]);

// or use gen() the same way:
const gen = request('stream-chain/utils/gen');

const theCoolest = chain([
  gen(
    x => x * x,
    x => 2 * x
  )
  // ... more stages
]);

Filter out values

Returning Chain.none terminates the pipeline. In the example below, it is used to filter out all even values.

const {chain, none, final} = require('stream-chain');

const pipeline = chain([
  [
    x => x % 2 ? x : none,
    x => x * x,
    x => 2 * x
  ]
  // ... more stages
]);

// input: 1, 2, 3, 4
// output: 2, 18

Return a final value prematurely

Wrapping value in final() terminates a pipeline and uses the value as the final result. The example below does not double odd values.

const {chain, none, final} = require('stream-chain');

const pipeline = chain([
  [
    x => x * x,
    x => x % 2 ? final(x) : x,
    x => 2 * x
  ]
  // ... more stages
]);

// input: 1, 2, 3, 4
// output: 1, 8, 9, 32

Create Transform out of functions

Sometimes all we need is to wrap a function into a Transform. It works with combined functions as well.

Wrap a function

const {convertToTransform} = require('stream-chain');

const stream = convertToTransform(x => x + 1);

Wrap and combine functions

const {convertToTransform} = require('stream-chain');

const stream = convertToTransform([
  x => x * x,
  x => 2 * x
]);

Slicing streams

Take

This example processes only 5 items from the beginning of a stream.

const take = require('stream-json/utils/take');

const pipeline = chain([
  take(5)
  // ... more stages
]);

Skip

This example skips 5 items from the beginning of a stream.

const skip = require('stream-json/utils/skip');

const pipeline = chain([
  skip(5)
  // ... more stages
]);

Take & skip together

This example skips 5 items from the beginning of a stream and takes the next 5.

const lessEfficient = chain([
  skip(5),
  take(5)
  // ... more stages
]);

const moreEfficient = chain([
  take({n: 5, skip: 5})
  // ... more stages
]);

Conditional take

Takes while a condition is true.

const takeWhile = require('stream-json/utils/takeWhile');

const pipeline = chain([
  takeWhile(item => item !== 'separator')
  // ... more stages
]);

Conditional skip

Skips while a condition is true.

const skipWhile = require('stream-json/utils/skipWhile');

const pipeline = chain([
  skipWhile(item => item !== 'separator')
  // ... more stages
]);

Conditional take & skip together

Processes data between first two separators.

const pipeline = chain([
  skipWhile(item => item !== 'separator'),
  skip(1), // skip the separator
  takeWhile(item => item !== 'separator')
  // ... more stages
]);

Folding

It is the same as reduce() in JavaScript's arrays.

Folding

const fold = require('stream-json/utils/fold');

const pipeline = chain([
  fold((acc, x) => acc + x, 0)
  // ... more stages
]);

// input: 1, 2, 3
// output: 6

Scanning

scan() is like fold() but outputs all intermediate values of its accumulator.

const scan = require('stream-json/utils/scan');

const pipeline = chain([
  scan((acc, x) => acc + x, 0)
  // ... more stages
]);

// input: 1, 2, 3
// output: 1, 3, 6

Dedicated reducer

Reduce is a Writable stream, which is used at the end of a pipeline to accumulate items. Its accumulator is available as a property. It can be used like fold() and scan().

const {reduce} = require('stream-json/utils/Reduce');

const toArray = reduce((acc, x) => {
  acc.push(x);
  return acc;
}, []);

const pipeline = chain([
  // ... more stages
  toArray
]);

// input: 1, 2, 3
// toArray.accumulator is [1, 2, 3]

Combined functions on steroids!

Unlike array-combined functions comp() and gen() can combine asynchronous and regular functions and generators. It allows functions to return multiple values wrapped in many(). Of course, none and final() works too.

Using generators

const {comp} = require('stream-chain/utils/comp');

const pipeline = chain([
  comp(
    function*(x) {
      yield x;
      yield 10 * x;
    },
    x => 2 * x
  )
  // ... more stages
]);

// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60

Using many()

const {chain, many} = require('stream-chain');
const {comp} = require('stream-chain/utils/comp');

const pipeline = chain([
  comp(
    x => many([x, 10 * x]),
    x => 2 * x
  )
  // ... more stages
]);

// input: 1, 2, 3
// output: 2, 20, 4, 40, 6, 60

Using asynchronous functions

const pipeline = chain([
  comp(
    async x => await getItemNumberFromDB(x),
    x => 2 * x
  )
  // ... more stages
]);

Combine functions as a function

Make a separate function has its benefits — you can use it with streams or without streams. comp.asFun() returns an asynchronous function.

const doubler = comp.asFun(
  async x => await getItemNumberFromDB(x),
  x => 2 * x
);

const pipeline = chain([
  doubler
  // ... more stages
]);

doubler(42)
  .then(value => console.log(value))
  .catch(error => console.error(error));