Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Otherwise not picking up empty stream #570

Open
jaidetree opened this issue Nov 19, 2016 · 11 comments
Open

Otherwise not picking up empty stream #570

jaidetree opened this issue Nov 19, 2016 · 11 comments

Comments

@jaidetree
Copy link
Contributor

jaidetree commented Nov 19, 2016

In my application repo I have the following function. I stream an immutable object down the pipeline. It filters it to see if we need to run a git diff to determine if we should copy the last version's assets or run a full build. However, it appears that after filter runs and if it returns false, that otherwise never fires. The stream appears to end, however it works completely as expected in my unit test. Does any cause come to mind which as to why the stream stops if that filter function returns false? I've put a console log statement in the otherwise stream-returning function but it does not fire.

function getBuildAction (stream) {
  let stateStream = stream.observe();

  return stream
    // Only process when we know there is a newer version of the source
    .filter((state) => state.lastAssetVersion !== state.currentAssetVersion)
    // Diff the repo against the target branch & emit diff files
    .flatMap((state) => runGitDiff(state.sourceDir))
    // Flip back to original state stream if diff was not required
    .otherwise(stateStream)
}

The code before it also runs from an observe like this:

function createBuildStream (stateStream) {
  let buildStream = stateStream.observe();

  buildStream
    .through(getBuildAction) // Where the stream stops
    .through(buildProcess) // Runs a bunch of commands
    .pipe(writeToLog()); // returns fs.createWriteStream

  return stateStream;
}
@vqvu
Copy link
Collaborator

vqvu commented Nov 20, 2016

I can't think of any reason why this wouldn't work. However, the implementation of otherwise uses the stream redirect feature, and that feature has always been a bit brittle in 2.x, so I wouldn't be surprised if this is a race condition bug in Highland.

Can you provide a test case that I can run that causes this behavior? I can't tell you much more without one.

Alternatively, the workaround is to implement getBuildAction without using otherwise.

function getBuildAction (stream) {
  return stream
    .flatMap(state => {
      if (state.lastAssetVersion !== state.currentAssetVersion) {
        return runGitDiff(state.sourceDir);
      } else {
        return _.of(state);
      }
    });
}

@jaidetree
Copy link
Contributor Author

jaidetree commented Nov 20, 2016

Hah the solution I've put in place of the otherwise is almost the exact same as what you proposed. Guess I'm slowly starting to get a feel for it! Though, it would be all the more rewarding to create these elaborate flows without if statements if possible.

I've been trying to isolate it in a test case but unfortunately it's a little challenging to track down. Below is the closest I've been able to reproduce the error where none of the getBuildAction steps are executed and the test case times out. It's not quite the exact situation but I think it's similar? I will continue to hunt down the exact case tomorrow.

let _ = require('highland');
let assert = require('assert');

function getBuildAction (stream) {
  let stateStream = stream.observe();

  return stream
    // Only process when we know there is a newer version of the source
    .filter((state) => state.lastAssetVersion !== state.currentAssetVersion)
    // Diff the repo against the target branch & emit diff files
    .flatMap((state) => _.of({ action: 'build' }))
    // Flip back to original state stream if diff was not required
    .otherwise(stateStream)
}

it('Should return a non-buildable state if not new source', (done) => {
  let state = {
    action: null,
    lastAssetVersion: 'abcdefgh',
    currentAssetVersion: 'abcdefgh'
  };
  let stream = _.of(state).observe();

  return stream
    .through(getBuildAction)
    .toCallback((err, buildState) => {
      assert.equal(buildState.action, null, 'Build action should be unmodified and left as null.');
      done(err, buildState);
    })
});

@vqvu
Copy link
Collaborator

vqvu commented Nov 20, 2016

The reason why your test case doesn't execute properly is because you only observe your source stream but never consume it. Thus, your observer stream never sees anything.

If you change your code to this, it should work.

  let stream = _.of(state);

  stream.observe()
    .through(getBuildAction)
    .toCallback((err, buildState) => {
      assert.equal(buildState.action, null, 'Build action should be unmodified and left as null.');
      done(err, buildState);
    })

  stream.resume();

This is probably not the root cause of your problem, right?

@jaidetree
Copy link
Contributor Author

Oh my mistake. Yeah unfortunately the reported issue is not so simple as it fires that first filter operation then stops. My hunch is that since this function works fine in the unit test it has more to do with the state of the stream coming into it.

@jaidetree
Copy link
Contributor Author

Hmm no luck so far but I do have a couple of questions:

1.) When are .through functions fired? It seems like it is happening a lot sooner than when their stream transforms are fired.

2.) What's the best way to debug the state of the stream? Just add console statements to node_modules/highland/src/index.js?

@vqvu
Copy link
Collaborator

vqvu commented Nov 21, 2016

  1. through functions are called immediately. stream.through(fn) is syntactic sugar for fn(stream).

  2. Yes. If you want to see the state of the stream, add console statements to index.js. There's already a bunch of commented out debug statements that you might find useful.

You'll want to reduce the test case as much as possible first. The stream state being printed out can get verbose very quickly as your pipeline becomes more complicated. Try starting with a failing pipeline and removing transforms until it stops breaking. For example, is .through(buildProcess) necessary? What about .pipe(writeToLog())? Can that be replaced with .resume() or .pipe(process.stdout)?

@jaidetree
Copy link
Contributor Author

Hey again, it's been a while. Today I was updating one of the util functions this project uses and while writing unit tests for it I stumbled upon the cause of this issue and confirmed it with a separate, isolated test.

In short before the aforementioned through function I was calling .consume(err, x, push, next) and pushing data but not calling next as I didn't think it always necessary like in a stream generator.

describe('consume', () => {
  it('Should fail if next is not called', (done) => {
    let preFilterSpy = expect.createSpy();
    let endSpy = expect.createSpy();

    _([ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ])
      .consume((err, x, push) => {
        push(err, x);
      })
      .tap(preFilterSpy)
      .filter(x => x === 'h')
      .otherwise(_.of('h'))
      .each(x => {
        expect(x).toBe('h');
      })
      .done(endSpy);

    expect(preFilterSpy).toHaveBeenCalled();
    expect(preFilterSpy.calls.length).toBe(2);
    expect(endSpy).toNotHaveBeenCalled();
    done();
  });

  it('Should succeed if next is called', (done) => {
    let preFilterSpy = expect.createSpy();
    let endSpy = expect.createSpy();

    _([ 'a', 'b', 'c', 'd', 'e', 'f', 'g' ])
      .consume((err, x, push, next) => {
        push(err, x);
        if (x !== _.nil) next();
      })
      .tap(preFilterSpy)
      .filter(x => x === 'h')
      .otherwise(_.of('h'))
      .each(x => {
        expect(x).toBe('h');
      })
      .done(endSpy);

    expect(preFilterSpy).toHaveBeenCalled();
    expect(preFilterSpy.calls.length).toBe(7);
    expect(endSpy).toHaveBeenCalled();
    done();
  });
});

While I'm ecstatic this issue is resolved, I still have a couple of questions for you @vqvu if it's no trouble.

  1. What is the difference between calling next() in a value generator vs next() in a consume function? For clarity purposes "value generator" refers to the _((push, next) => { ... }); api.

  2. When would you not call next() in both a value generator function and a consume method?

@vqvu
Copy link
Collaborator

vqvu commented Mar 21, 2017

I'm glad you found the problem! And thanks for posting your repro case. It actually shows an unrelated bug in Highland. In the failure case, preFilterSpy.calls.length should be 1 and not 2, since you never call next() in your consume handler. Fix in #608.

To answer your questions:

What is the difference between calling next() in a value generator vs next() in a consume function?

They're not all that different. next is used in both cases to signal that you are ready for the handler to be called again. Typically, you call it once you are done calling push.

For value generators, generally, you'll have code that looks like

  1. Generator called.
  2. Generator produces some values and emits it with push. It may do this asynchronously.
  3. Generator call push(null, _.nil) if there's no more data or it calls next to ask Highland to call it again when the stream needs more data.

For consume handlers, you have a very similar thing,

  1. Generator called with the first value that the source emits.
  2. Generator transforms that value to 0 or more other values and emits it with push. It may do this asynchronously.
  3. Generator call push(null, _.nil) if it doesn't want to emit any more data, or it calls next to ask Highland for the next value that the source emits. As before, Highland will only call the handler again when the stream needs more data.

When would you not call next() in both a value generator function and a consume method?

I can think of two reasons.

  1. You are not allowed to call next() after pushing _.nil. For a consume handler, this is true even if you have not received a _.nil from the source stream. For example, take will push nil when it has pushed enough values, and never call next() again, even though the source stream never emitted nil.
  2. You are implementing a generator for a "hot" source where it does not make sense to support backpressure. For example, if you are wrapping click events from the DOM, you would just call push whenever the event fires. There's no reason to ever call next.

@vqvu
Copy link
Collaborator

vqvu commented Mar 21, 2017

It actually shows an unrelated bug in Highland. In the failure case, preFilterSpy.calls.length should be 1 and not 2, since you never call next() in your consume handler.

FYI, I've released 2.10.3 2.10.4, which should address this issue. The correct assert statement is

expect(preFilterSpy.calls.length).toBe(1);

Let me know if the new release causes any issues.

Edit: Fix typos.

@jaidetree
Copy link
Contributor Author

jaidetree commented Mar 21, 2017

Cool, glad it uncovered something that may help others. Will give it a shot and report any issues that may come up.

Thanks for the explanation of the next() function in stream generation and consumption.

So for reading from an array you would probably want to use next so that data is only sent down when requested by the consumers:

let data = ['one', 'two', 'three', 'four'];
_((push, next) => {
    push(data.shift());
    if (data.length) next();
    else push(null, _.nil);
});

But for running a one-and-done like child_process.exec you wouldn't call next?

_((push) => {
    child_process.exec('ls', (err, stdout, stderr) => {
        if (err === null || err.code === 0) {
            push(null, stdout.toString('utf8'));
        }
        else if (err.code > 0) {
            push(stdout.toString('utf8') + '\n' + stderr.toString('utf8'));
        }
        else {
           push(err);
        }
       
        push(null, _.nil);
    });
});

@vqvu
Copy link
Collaborator

vqvu commented Mar 22, 2017

Yes to both cases, though there's a typo in your array example. It should be using

push(null, data.shift())

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants